Documentation ¶
Index ¶
- Constants
- Variables
- func Marshal(v interface{}) ([]byte, error)
- func ReadAll(b Bytes) ([]byte, error)
- func TCP(address ...string) net.Addr
- func Unmarshal(b []byte, v interface{}) error
- type ACLDescription
- type ACLEntry
- type ACLFilter
- type ACLOperationType
- type ACLPermissionType
- type ACLResource
- type AddOffsetsToTxnRequest
- type AddOffsetsToTxnResponse
- type AddPartitionToTxn
- type AddPartitionToTxnPartition
- type AddPartitionsToTxnRequest
- type AddPartitionsToTxnResponse
- type AlterClientQuotaEntity
- type AlterClientQuotaEntry
- type AlterClientQuotaOps
- type AlterClientQuotaResponseQuotas
- type AlterClientQuotasRequest
- type AlterClientQuotasResponse
- type AlterConfigRequestConfig
- type AlterConfigRequestResource
- type AlterConfigsRequest
- type AlterConfigsResponse
- type AlterConfigsResponseResource
- type AlterPartitionReassignmentsRequest
- type AlterPartitionReassignmentsRequestAssignment
- type AlterPartitionReassignmentsResponse
- type AlterPartitionReassignmentsResponsePartitionResult
- type AlterUserScramCredentialsRequest
- type AlterUserScramCredentialsResponse
- type AlterUserScramCredentialsResponseUser
- type ApiVersion
- type ApiVersionsRequest
- type ApiVersionsResponse
- type ApiVersionsResponseApiKey
- type Balancer
- type BalancerFunc
- type Batch
- func (batch *Batch) Close() error
- func (batch *Batch) Err() error
- func (batch *Batch) HighWaterMark() int64
- func (batch *Batch) Offset() int64
- func (batch *Batch) Partition() int
- func (batch *Batch) Read(b []byte) (int, error)
- func (batch *Batch) ReadMessage() (Message, error)
- func (batch *Batch) Throttle() time.Duration
- type Broker
- type BrokerResolver
- type Bytes
- type CRC32Balancer
- type Client
- func (c *Client) AddOffsetsToTxn(ctx context.Context, req *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)
- func (c *Client) AddPartitionsToTxn(ctx context.Context, req *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)
- func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error)
- func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error)
- func (c *Client) AlterPartitionReassignments(ctx context.Context, req *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error)
- func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error)
- func (c *Client) ApiVersions(ctx context.Context, req *ApiVersionsRequest) (*ApiVersionsResponse, error)
- func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error)
- func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error)
- func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error)
- func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error)
- func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error)
- func (c *Client) DeleteGroups(ctx context.Context, req *DeleteGroupsRequest) (*DeleteGroupsResponse, error)
- func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error)
- func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error)
- func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error)
- func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error)
- func (c *Client) DescribeGroups(ctx context.Context, req *DescribeGroupsRequest) (*DescribeGroupsResponse, error)
- func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error)
- func (c *Client) ElectLeaders(ctx context.Context, req *ElectLeadersRequest) (*ElectLeadersResponse, error)
- func (c *Client) EndTxn(ctx context.Context, req *EndTxnRequest) (*EndTxnResponse, error)
- func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
- func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error)
- func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
- func (c *Client) IncrementalAlterConfigs(ctx context.Context, req *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error)
- func (c *Client) InitProducerID(ctx context.Context, req *InitProducerIDRequest) (*InitProducerIDResponse, error)
- func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error)
- func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error)
- func (c *Client) ListGroups(ctx context.Context, req *ListGroupsRequest) (*ListGroupsResponse, error)
- func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error)
- func (c *Client) ListPartitionReassignments(ctx context.Context, req *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error)
- func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error)
- func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error)
- func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error)
- func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error)
- func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error)
- func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error)
- func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error)
- func (c *Client) TxnOffsetCommit(ctx context.Context, req *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)
- type Compression
- type CompressionCodec
- type ConfigEntry
- type ConfigOperation
- type Conn
- func Dial(network string, address string) (*Conn, error)
- func DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
- func NewConn(conn net.Conn, topic string, partition int) *Conn
- func NewConnWith(conn net.Conn, config ConnConfig) *Conn
- func (c *Conn) ApiVersions() ([]ApiVersion, error)
- func (c *Conn) Broker() Broker
- func (c *Conn) Brokers() ([]Broker, error)
- func (c *Conn) Close() error
- func (c *Conn) Controller() (broker Broker, err error)
- func (c *Conn) CreateTopics(topics ...TopicConfig) error
- func (c *Conn) DeleteTopics(topics ...string) error
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) Offset() (offset int64, whence int)
- func (c *Conn) Read(b []byte) (int, error)
- func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch
- func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch
- func (c *Conn) ReadFirstOffset() (int64, error)
- func (c *Conn) ReadLastOffset() (int64, error)
- func (c *Conn) ReadMessage(maxBytes int) (Message, error)
- func (c *Conn) ReadOffset(t time.Time) (int64, error)
- func (c *Conn) ReadOffsets() (first, last int64, err error)
- func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) Seek(offset int64, whence int) (int64, error)
- func (c *Conn) SetDeadline(t time.Time) error
- func (c *Conn) SetReadDeadline(t time.Time) error
- func (c *Conn) SetRequiredAcks(n int) error
- func (c *Conn) SetWriteDeadline(t time.Time) error
- func (c *Conn) Write(b []byte) (int, error)
- func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error)
- func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error)
- func (c *Conn) WriteMessages(msgs ...Message) (int, error)
- type ConnConfig
- type ConsumerGroup
- type ConsumerGroupConfig
- type CoordinatorKeyType
- type CreateACLsRequest
- type CreateACLsResponse
- type CreatePartitionsRequest
- type CreatePartitionsResponse
- type CreateTopicsRequest
- type CreateTopicsResponse
- type DeleteACLsFilter
- type DeleteACLsMatchingACLs
- type DeleteACLsRequest
- type DeleteACLsResponse
- type DeleteACLsResult
- type DeleteGroupsRequest
- type DeleteGroupsResponse
- type DeleteTopicsRequest
- type DeleteTopicsResponse
- type DescribeACLsRequest
- type DescribeACLsResponse
- type DescribeClientQuotasEntity
- type DescribeClientQuotasRequest
- type DescribeClientQuotasRequestComponent
- type DescribeClientQuotasResponse
- type DescribeClientQuotasResponseQuotas
- type DescribeClientQuotasValue
- type DescribeConfigRequestResource
- type DescribeConfigResponseConfigEntry
- type DescribeConfigResponseConfigSynonym
- type DescribeConfigResponseResource
- type DescribeConfigsRequest
- type DescribeConfigsResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type DescribeGroupsResponseAssignments
- type DescribeGroupsResponseGroup
- type DescribeGroupsResponseMember
- type DescribeGroupsResponseMemberMetadata
- type DescribeGroupsResponseMemberMetadataOwnedPartition
- type DescribeUserScramCredentialsCredentialInfo
- type DescribeUserScramCredentialsRequest
- type DescribeUserScramCredentialsResponse
- type DescribeUserScramCredentialsResponseResult
- type Dialer
- func (d *Dialer) Dial(network string, address string) (*Conn, error)
- func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
- func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error)
- func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error)
- func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)
- type DurationStats
- type ElectLeadersRequest
- type ElectLeadersResponse
- type ElectLeadersResponsePartitionResult
- type EndTxnRequest
- type EndTxnResponse
- type Error
- type FetchRequest
- type FetchResponse
- type FindCoordinatorRequest
- type FindCoordinatorResponse
- type FindCoordinatorResponseCoordinator
- type Generation
- type GroupBalancer
- type GroupMember
- type GroupMemberAssignments
- type GroupMemberTopic
- type GroupProtocol
- type GroupProtocolAssignment
- type GroupProtocolSubscription
- type Hash
- type Header
- type HeartbeatRequest
- type HeartbeatResponse
- type IncrementalAlterConfigsRequest
- type IncrementalAlterConfigsRequestConfig
- type IncrementalAlterConfigsRequestResource
- type IncrementalAlterConfigsResponse
- type IncrementalAlterConfigsResponseResource
- type InitProducerIDRequest
- type InitProducerIDResponse
- type IsolationLevel
- type JoinGroupRequest
- type JoinGroupResponse
- type JoinGroupResponseMember
- type LeastBytes
- type LeaveGroupRequest
- type LeaveGroupRequestMember
- type LeaveGroupResponse
- type LeaveGroupResponseMember
- type ListGroupsRequest
- type ListGroupsResponse
- type ListGroupsResponseGroup
- type ListOffsetsRequest
- type ListOffsetsResponse
- type ListPartitionReassignmentsRequest
- type ListPartitionReassignmentsRequestTopic
- type ListPartitionReassignmentsResponse
- type ListPartitionReassignmentsResponsePartition
- type ListPartitionReassignmentsResponseTopic
- type Logger
- type LoggerFunc
- type Message
- type MessageTooLargeError
- type MetadataRequest
- type MetadataResponse
- type Murmur2Balancer
- type OffsetCommit
- type OffsetCommitPartition
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetDelete
- type OffsetDeletePartition
- type OffsetDeleteRequest
- type OffsetDeleteResponse
- type OffsetFetchPartition
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetRequest
- type Partition
- type PartitionAssignment
- type PartitionOffsets
- type PatternType
- type ProduceRequest
- type ProduceResponse
- type ProducerSession
- type RackAffinityGroupBalancer
- type RangeGroupBalancer
- type RawProduceRequest
- type ReadBatchConfig
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error
- func (r *Reader) Config() ReaderConfig
- func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
- func (r *Reader) Lag() int64
- func (r *Reader) Offset() int64
- func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error)
- func (r *Reader) ReadMessage(ctx context.Context) (Message, error)
- func (r *Reader) SetOffset(offset int64) error
- func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error
- func (r *Reader) Stats() ReaderStats
- type ReaderConfig
- type ReaderStats
- type Record
- type RecordReader
- type ReferenceHash
- type ReplicaAssignment
- type Request
- type RequiredAcks
- type Resolver
- type ResourceType
- type Response
- type RoundRobin
- type RoundRobinGroupBalancer
- type RoundTripper
- type ScramMechanism
- type SummaryStats
- type SyncGroupRequest
- type SyncGroupRequestAssignment
- type SyncGroupResponse
- type Topic
- type TopicAndGroup
- type TopicConfig
- type TopicPartitionAssignment
- type TopicPartitionsConfig
- type Transport
- type TxnOffsetCommit
- type TxnOffsetCommitPartition
- type TxnOffsetCommitRequest
- type TxnOffsetCommitResponse
- type UserScramCredentialsDeletion
- type UserScramCredentialsUpsertion
- type UserScramCredentialsUser
- type Version
- type WriteErrors
- type Writer
- type WriterConfig
- type WriterStats
Examples ¶
Constants ¶
const ( SeekStart = 0 // Seek relative to the first offset available in the partition. SeekAbsolute = 1 // Seek to an absolute offset. SeekEnd = 2 // Seek relative to the last offset available in the partition. SeekCurrent = 3 // Seek relative to the current offset. // This flag may be combined to any of the SeekAbsolute and SeekCurrent // constants to skip the bound check that the connection would do otherwise. // Programs can use this flag to avoid making a metadata request to the kafka // broker to read the current first and last offsets of the partition. SeekDontCheck = 1 << 30 )
const ( LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset int64 = -2 // The least recent offset available for a partition. )
Variables ¶
var ( // DefaultClientID is the default value used as ClientID of kafka // connections. DefaultClientID string )
var DefaultDialer = &Dialer{ Timeout: 10 * time.Second, DualStack: true, }
DefaultDialer is the default dialer used when none is specified.
var ErrGenerationEnded = errors.New("consumer group generation has ended")
ErrGenerationEnded is returned by the context.Context issued by the Generation's Start function when the context has been closed.
var ErrGroupClosed = errors.New("consumer group is closed")
ErrGroupClosed is returned by ConsumerGroup.Next when the group has already been closed.
Functions ¶
func Marshal ¶ added in v0.4.0
Marshal encodes v into a binary representation of the value in the kafka data format.
If v is a, or contains struct types, the kafka struct fields are interpreted and may contain one of these values:
nullable valid on bytes and strings, encodes as a nullable value compact valid on strings, encodes as a compact string
The kafka struct tags should not contain min and max versions. If you need to encode types based on specific versions of kafka APIs, use the Version type instead.
Types ¶
type ACLDescription ¶ added in v0.4.43
type ACLDescription struct { Principal string Host string Operation ACLOperationType PermissionType ACLPermissionType }
type ACLEntry ¶ added in v0.4.29
type ACLEntry struct { ResourceType ResourceType ResourceName string ResourcePatternType PatternType Principal string Host string Operation ACLOperationType PermissionType ACLPermissionType }
type ACLFilter ¶ added in v0.4.43
type ACLFilter struct { ResourceTypeFilter ResourceType ResourceNameFilter string // ResourcePatternTypeFilter was added in v1 and is not available prior to that. ResourcePatternTypeFilter PatternType PrincipalFilter string HostFilter string Operation ACLOperationType PermissionType ACLPermissionType }
type ACLOperationType ¶ added in v0.4.29
type ACLOperationType int8
const ( ACLOperationTypeUnknown ACLOperationType = 0 ACLOperationTypeAny ACLOperationType = 1 ACLOperationTypeAll ACLOperationType = 2 ACLOperationTypeRead ACLOperationType = 3 ACLOperationTypeWrite ACLOperationType = 4 ACLOperationTypeCreate ACLOperationType = 5 ACLOperationTypeDelete ACLOperationType = 6 ACLOperationTypeAlter ACLOperationType = 7 ACLOperationTypeDescribe ACLOperationType = 8 ACLOperationTypeClusterAction ACLOperationType = 9 ACLOperationTypeDescribeConfigs ACLOperationType = 10 ACLOperationTypeAlterConfigs ACLOperationType = 11 ACLOperationTypeIdempotentWrite ACLOperationType = 12 )
func (ACLOperationType) MarshalText ¶ added in v0.4.45
func (aot ACLOperationType) MarshalText() ([]byte, error)
MarshalText transforms an ACLOperationType into its string representation.
func (ACLOperationType) String ¶ added in v0.4.45
func (aot ACLOperationType) String() string
func (*ACLOperationType) UnmarshalText ¶ added in v0.4.45
func (aot *ACLOperationType) UnmarshalText(text []byte) error
UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
type ACLPermissionType ¶ added in v0.4.29
type ACLPermissionType int8
const ( ACLPermissionTypeUnknown ACLPermissionType = 0 ACLPermissionTypeAny ACLPermissionType = 1 ACLPermissionTypeDeny ACLPermissionType = 2 ACLPermissionTypeAllow ACLPermissionType = 3 )
func (ACLPermissionType) MarshalText ¶ added in v0.4.45
func (apt ACLPermissionType) MarshalText() ([]byte, error)
MarshalText transforms an ACLPermissionType into its string representation.
func (ACLPermissionType) String ¶ added in v0.4.45
func (apt ACLPermissionType) String() string
func (*ACLPermissionType) UnmarshalText ¶ added in v0.4.45
func (apt *ACLPermissionType) UnmarshalText(text []byte) error
UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
type ACLResource ¶ added in v0.4.43
type ACLResource struct { ResourceType ResourceType ResourceName string PatternType PatternType ACLs []ACLDescription }
type AddOffsetsToTxnRequest ¶ added in v0.4.20
type AddOffsetsToTxnRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key TransactionalID string // The Producer ID (PID) for the current producer session; // received from an InitProducerID request. ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int // The unique group identifier. GroupID string }
AddOffsetsToTxnRequest is the request structure for the AddOffsetsToTxn function.
type AddOffsetsToTxnResponse ¶ added in v0.4.20
type AddOffsetsToTxnResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // An error that may have occurred when attempting to add the offsets // to a transaction. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error }
AddOffsetsToTxnResponse is the response structure for the AddOffsetsToTxn function.
type AddPartitionToTxn ¶ added in v0.4.19
type AddPartitionToTxn struct { // Partition is the ID of a partition to add to the transaction. Partition int }
AddPartitionToTxn represents a partition to be added to a transaction.
type AddPartitionToTxnPartition ¶ added in v0.4.19
type AddPartitionToTxnPartition struct { // The ID of the partition. Partition int // An error that may have occurred when attempting to add the partition // to a transaction. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error }
AddPartitionToTxnPartition represents the state of a single partition in response to adding to a transaction.
type AddPartitionsToTxnRequest ¶ added in v0.4.19
type AddPartitionsToTxnRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key TransactionalID string // The Producer ID (PID) for the current producer session; // received from an InitProducerID request. ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int // Mappings of topic names to lists of partitions. Topics map[string][]AddPartitionToTxn }
AddPartitionsToTxnRequest is the request structure fo the AddPartitionsToTxn function.
type AddPartitionsToTxnResponse ¶ added in v0.4.21
type AddPartitionsToTxnResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Mappings of topic names to partitions being added to a transactions. Topics map[string][]AddPartitionToTxnPartition }
AddPartitionsToTxnResponse is the response structure for the AddPartitionsToTxn function.
type AlterClientQuotaEntity ¶ added in v0.4.41
type AlterClientQuotaEntry ¶ added in v0.4.41
type AlterClientQuotaEntry struct { // The quota entities to alter. Entities []AlterClientQuotaEntity // An individual quota configuration entry to alter. Ops []AlterClientQuotaOps }
type AlterClientQuotaOps ¶ added in v0.4.41
type AlterClientQuotaResponseQuotas ¶ added in v0.4.41
type AlterClientQuotaResponseQuotas struct { // Error is set to a non-nil value including the code and message if a top-level // error was encountered when doing the update. Error error // The altered quota entities. Entities []AlterClientQuotaEntity }
type AlterClientQuotasRequest ¶ added in v0.4.41
type AlterClientQuotasRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of client quotas entries to alter. Entries []AlterClientQuotaEntry // Whether the alteration should be validated, but not performed. ValidateOnly bool }
AlterClientQuotasRequest represents a request sent to a kafka broker to alter client quotas.
type AlterClientQuotasResponse ¶ added in v0.4.41
type AlterClientQuotasResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // List of altered client quotas responses. Entries []AlterClientQuotaResponseQuotas }
AlterClientQuotasResponse represents a response from a kafka broker to an alter client quotas request.
type AlterConfigRequestConfig ¶ added in v0.4.9
type AlterConfigRequestResource ¶ added in v0.4.9
type AlterConfigRequestResource struct { // Resource Type ResourceType ResourceType // Resource Name ResourceName string // Configs is a list of configuration updates. Configs []AlterConfigRequestConfig }
type AlterConfigsRequest ¶ added in v0.4.9
type AlterConfigsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of resources to update. Resources []AlterConfigRequestResource // When set to true, topics are not created but the configuration is // validated as if they were. ValidateOnly bool }
AlterConfigsRequest represents a request sent to a kafka broker to alter configs.
type AlterConfigsResponse ¶ added in v0.4.9
type AlterConfigsResponse struct { // Duration for which the request was throttled due to a quota violation. Throttle time.Duration // Mapping of topic names to errors that occurred while attempting to create // the topics. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors map[AlterConfigsResponseResource]error }
AlterConfigsResponse represents a response from a kafka broker to an alter config request.
type AlterConfigsResponseResource ¶ added in v0.4.9
AlterConfigsResponseResource helps map errors to specific resources in an alter config response.
type AlterPartitionReassignmentsRequest ¶ added in v0.4.9
type AlterPartitionReassignmentsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to // reassign to multiple topics. Topic string // Assignments is the list of partition reassignments to submit to the API. Assignments []AlterPartitionReassignmentsRequestAssignment // Timeout is the amount of time to wait for the request to complete. Timeout time.Duration }
AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
type AlterPartitionReassignmentsRequestAssignment ¶ added in v0.4.9
type AlterPartitionReassignmentsRequestAssignment struct { // Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used. Topic string // PartitionID is the ID of the partition to make the reassignments in. PartitionID int // BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition. BrokerIDs []int }
AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single partition.
type AlterPartitionReassignmentsResponse ¶ added in v0.4.9
type AlterPartitionReassignmentsResponse struct { // Error is set to a non-nil value including the code and message if a top-level // error was encountered when doing the update. Error error // PartitionResults contains the specific results for each partition. PartitionResults []AlterPartitionReassignmentsResponsePartitionResult }
AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
type AlterPartitionReassignmentsResponsePartitionResult ¶ added in v0.4.9
type AlterPartitionReassignmentsResponsePartitionResult struct { // Topic is the topic name. Topic string // PartitionID is the ID of the partition that was altered. PartitionID int // Error is set to a non-nil value including the code and message if an error was encountered // during the update for this partition. Error error }
AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of doing reassignments for a single partition.
type AlterUserScramCredentialsRequest ¶ added in v0.4.43
type AlterUserScramCredentialsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of credentials to delete. Deletions []UserScramCredentialsDeletion // List of credentials to upsert. Upsertions []UserScramCredentialsUpsertion }
AlterUserScramCredentialsRequest represents a request sent to a kafka broker to alter user scram credentials.
type AlterUserScramCredentialsResponse ¶ added in v0.4.43
type AlterUserScramCredentialsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // List of altered user scram credentials. Results []AlterUserScramCredentialsResponseUser }
AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user credentials request.
type AlterUserScramCredentialsResponseUser ¶ added in v0.4.43
type ApiVersion ¶ added in v0.2.3
type ApiVersionsRequest ¶ added in v0.4.9
type ApiVersionsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr }
ApiVersionsRequest is a request to the ApiVersions API.
type ApiVersionsResponse ¶ added in v0.4.9
type ApiVersionsResponse struct { // Error is set to a non-nil value if an error was encountered. Error error // ApiKeys contains the specific details of each supported API. ApiKeys []ApiVersionsResponseApiKey }
ApiVersionsResponse is a response from the ApiVersions API.
type ApiVersionsResponseApiKey ¶ added in v0.4.9
type ApiVersionsResponseApiKey struct { // ApiKey is the ID of the API. ApiKey int // ApiName is a human-friendly description of the API. ApiName string // MinVersion is the minimum API version supported by the broker. MinVersion int // MaxVersion is the maximum API version supported by the broker. MaxVersion int }
ApiVersionsResponseApiKey includes the details of which versions are supported for a single API.
type Balancer ¶
type Balancer interface { // Balance receives a message and a set of available partitions and // returns the partition number that the message should be routed to. // // An application should refrain from using a balancer to manage multiple // sets of partitions (from different topics for examples), use one balancer // instance for each partition set, so the balancer can detect when the // partitions change and assume that the kafka topic has been rebalanced. Balance(msg Message, partitions ...int) (partition int) }
The Balancer interface provides an abstraction of the message distribution logic used by Writer instances to route messages to the partitions available on a kafka cluster.
Balancers must be safe to use concurrently from multiple goroutines.
type BalancerFunc ¶
BalancerFunc is an implementation of the Balancer interface that makes it possible to use regular functions to distribute messages across partitions.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
A Batch is an iterator over a sequence of messages fetched from a kafka server.
Batches are created by calling (*Conn).ReadBatch. They hold a internal lock on the connection, which is released when the batch is closed. Failing to call a batch's Close method will likely result in a dead-lock when trying to use the connection.
Batches are safe to use concurrently from multiple goroutines.
func (*Batch) Close ¶
Close closes the batch, releasing the connection lock and returning an error if reading the batch failed for any reason.
func (*Batch) Err ¶ added in v0.2.5
Err returns a non-nil error if the batch is broken. This is the same error that would be returned by Read, ReadMessage or Close (except in the case of io.EOF which is never returned by Close).
This method is useful when building retry mechanisms for (*Conn).ReadBatch, the program can check whether the batch carried a error before attempting to read the first message.
Note that checking errors on a batch is optional, calling Read or ReadMessage is always valid and can be used to either read a message or an error in cases where that's convenient.
func (*Batch) HighWaterMark ¶
Watermark returns the current highest watermark in a partition.
func (*Batch) Read ¶
Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read.
If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating that the program consumed all messages from the batch) are also returned by Close.
The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.
func (*Batch) ReadMessage ¶
ReadMessage reads and return the next message from the batch.
Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.
type BrokerResolver ¶ added in v0.4.5
type BrokerResolver interface { // Returns the IP addresses of the broker passed as argument. LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error) }
BrokerResolver is an interface implemented by types that translate host names into a network address.
This resolver is not intended to be a general purpose interface. Instead, it is tailored to the particular needs of the kafka protocol, with the goal being to provide a flexible mechanism for extending broker name resolution while retaining context that is specific to interacting with a kafka cluster.
Resolvers must be safe to use from multiple goroutines.
func NewBrokerResolver ¶ added in v0.4.5
func NewBrokerResolver(r *net.Resolver) BrokerResolver
NewBrokerResolver constructs a Resolver from r.
If r is nil, net.DefaultResolver is used instead.
type Bytes ¶ added in v0.4.0
Bytes is an interface representing a sequence of bytes. This abstraction makes it possible for programs to inject data into produce requests without having to load in into an intermediary buffer, or read record keys and values from a fetch response directly from internal buffers.
Bytes are not safe to use concurrently from multiple goroutines.
type CRC32Balancer ¶ added in v0.3.1
type CRC32Balancer struct { Consistent bool // contains filtered or unexported fields }
CRC32Balancer is a Balancer that uses the CRC32 hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition. This balancer is compatible with the built-in hash partitioners in librdkafka and the language bindings that are built on top of it, including the github.com/confluentinc/confluent-kafka-go Go package.
With the Consistent field false (default), this partitioner is equivalent to the "consistent_random" setting in librdkafka. When Consistent is true, this partitioner is equivalent to the "consistent" setting. The latter will hash empty or nil keys into the same partition.
Unless you are absolutely certain that all your messages will have keys, it's best to leave the Consistent flag off. Otherwise, you run the risk of creating a very hot partition.
type Client ¶ added in v0.2.5
type Client struct { // Address of the kafka cluster (or specific broker) that the client will be // sending requests to. // // This field is optional, the address may be provided in each request // instead. The request address takes precedence if both were specified. Addr net.Addr // Time limit for requests sent by this client. // // If zero, no timeout is applied. Timeout time.Duration // A transport used to communicate with the kafka brokers. // // If nil, DefaultTransport is used. Transport RoundTripper }
Client is a high-level API to interract with kafka brokers.
All methods of the Client type accept a context as first argument, which may be used to asynchronously cancel the requests.
Clients are safe to use concurrently from multiple goroutines, as long as their configuration is not changed after first use.
func (*Client) AddOffsetsToTxn ¶ added in v0.4.20
func (c *Client) AddOffsetsToTxn( ctx context.Context, req *AddOffsetsToTxnRequest, ) (*AddOffsetsToTxnResponse, error)
AddOffsetsToTnx sends an add offsets to txn request to a kafka broker and returns the response.
func (*Client) AddPartitionsToTxn ¶ added in v0.4.19
func (c *Client) AddPartitionsToTxn( ctx context.Context, req *AddPartitionsToTxnRequest, ) (*AddPartitionsToTxnResponse, error)
AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.
func (*Client) AlterClientQuotas ¶ added in v0.4.41
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error)
AlterClientQuotas sends client quotas alteration request to a kafka broker and returns the response.
func (*Client) AlterConfigs ¶ added in v0.4.9
func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error)
AlterConfigs sends a config altering request to a kafka broker and returns the response.
func (*Client) AlterPartitionReassignments ¶ added in v0.4.9
func (c *Client) AlterPartitionReassignments( ctx context.Context, req *AlterPartitionReassignmentsRequest, ) (*AlterPartitionReassignmentsResponse, error)
func (*Client) AlterUserScramCredentials ¶ added in v0.4.43
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error)
AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns the response.
func (*Client) ApiVersions ¶ added in v0.4.9
func (c *Client) ApiVersions( ctx context.Context, req *ApiVersionsRequest, ) (*ApiVersionsResponse, error)
func (*Client) ConsumerOffsets ¶ added in v0.2.5
ConsumerOffsets returns a map[int]int64 of partition to committed offset for a consumer group id and topic.
DEPRECATED: this method will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.
func (*Client) CreateACLs ¶ added in v0.4.29
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error)
CreateACLs sends ACLs creation request to a kafka broker and returns the response.
func (*Client) CreatePartitions ¶ added in v0.4.9
func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error)
CreatePartitions sends a partitions creation request to a kafka broker and returns the response.
func (*Client) CreateTopics ¶ added in v0.4.0
func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error)
CreateTopics sends a topic creation request to a kafka broker and returns the response.
func (*Client) DeleteACLs ¶ added in v0.4.43
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error)
DeleteACLs sends ACLs deletion request to a kafka broker and returns the response.
func (*Client) DeleteGroups ¶ added in v0.4.40
func (c *Client) DeleteGroups( ctx context.Context, req *DeleteGroupsRequest, ) (*DeleteGroupsResponse, error)
DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group of the request. All deleted groups must be managed by the same group coordinator.
func (*Client) DeleteTopics ¶ added in v0.4.0
func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error)
DeleteTopics sends a topic deletion request to a kafka broker and returns the response.
func (*Client) DescribeACLs ¶ added in v0.4.43
func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error)
func (*Client) DescribeClientQuotas ¶ added in v0.4.41
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error)
DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns the response.
func (*Client) DescribeConfigs ¶ added in v0.4.9
func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error)
DescribeConfigs sends a config altering request to a kafka broker and returns the response.
func (*Client) DescribeGroups ¶ added in v0.4.9
func (c *Client) DescribeGroups( ctx context.Context, req *DescribeGroupsRequest, ) (*DescribeGroupsResponse, error)
DescribeGroups calls the Kafka DescribeGroups API to get information about one or more consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for more information.
func (*Client) DescribeUserScramCredentials ¶ added in v0.4.43
func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error)
DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns the response.
func (*Client) ElectLeaders ¶ added in v0.4.9
func (c *Client) ElectLeaders( ctx context.Context, req *ElectLeadersRequest, ) (*ElectLeadersResponse, error)
func (*Client) EndTxn ¶ added in v0.4.19
func (c *Client) EndTxn(ctx context.Context, req *EndTxnRequest) (*EndTxnResponse, error)
EndTxn sends an EndTxn request to a kafka broker and returns its response.
func (*Client) Fetch ¶ added in v0.4.0
func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
Fetch sends a fetch request to a kafka broker and returns the response.
If the broker returned an invalid response with no topics, an error wrapping protocol.ErrNoTopic is returned.
If the broker returned an invalid response with no partitions, an error wrapping ErrNoPartitions is returned.
func (*Client) FindCoordinator ¶ added in v0.4.10
func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error)
FindCoordinator sends a findCoordinator request to a kafka broker and returns the response.
func (*Client) Heartbeat ¶ added in v0.4.19
func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
Heartbeat sends a heartbeat request to a kafka broker and returns the response.
func (*Client) IncrementalAlterConfigs ¶ added in v0.4.9
func (c *Client) IncrementalAlterConfigs( ctx context.Context, req *IncrementalAlterConfigsRequest, ) (*IncrementalAlterConfigsResponse, error)
func (*Client) InitProducerID ¶ added in v0.4.11
func (c *Client) InitProducerID(ctx context.Context, req *InitProducerIDRequest) (*InitProducerIDResponse, error)
InitProducerID sends a initProducerId request to a kafka broker and returns the response.
func (*Client) JoinGroup ¶ added in v0.4.33
func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error)
JoinGroup sends a join group request to the coordinator and returns the response.
func (*Client) LeaveGroup ¶ added in v0.4.33
func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error)
func (*Client) ListGroups ¶ added in v0.4.9
func (c *Client) ListGroups( ctx context.Context, req *ListGroupsRequest, ) (*ListGroupsResponse, error)
func (*Client) ListOffsets ¶ added in v0.4.0
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error)
ListOffsets sends an offset request to a kafka broker and returns the response.
func (*Client) ListPartitionReassignments ¶ added in v0.4.45
func (c *Client) ListPartitionReassignments( ctx context.Context, req *ListPartitionReassignmentsRequest, ) (*ListPartitionReassignmentsResponse, error)
func (*Client) Metadata ¶ added in v0.4.0
func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error)
Metadata sends a metadata request to a kafka broker and returns the response.
func (*Client) OffsetCommit ¶ added in v0.4.19
func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error)
OffsetCommit sends an offset commit request to a kafka broker and returns the response.
func (*Client) OffsetDelete ¶ added in v0.4.36
func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error)
OffsetDelete sends a delete offset request to a kafka broker and returns the response.
func (*Client) OffsetFetch ¶ added in v0.4.0
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error)
OffsetFetch sends an offset fetch request to a kafka broker and returns the response.
func (*Client) Produce ¶ added in v0.4.0
func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error)
Produce sends a produce request to a kafka broker and returns the response.
If the request contained no records, an error wrapping protocol.ErrNoRecord is returned.
When the request is configured with RequiredAcks=none, both the response and the error will be nil on success.
func (*Client) RawProduce ¶ added in v0.4.47
func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error)
RawProduce sends a raw produce request to a kafka broker and returns the response.
If the request contained no records, an error wrapping protocol.ErrNoRecord is returned.
When the request is configured with RequiredAcks=none, both the response and the error will be nil on success.
func (*Client) SyncGroup ¶ added in v0.4.33
func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error)
SyncGroup sends a sync group request to the coordinator and returns the response.
func (*Client) TxnOffsetCommit ¶ added in v0.4.20
func (c *Client) TxnOffsetCommit( ctx context.Context, req *TxnOffsetCommitRequest, ) (*TxnOffsetCommitResponse, error)
TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns the response.
type Compression ¶ added in v0.4.0
type Compression = compress.Compression
const ( Gzip Compression = compress.Gzip Snappy Compression = compress.Snappy Lz4 Compression = compress.Lz4 Zstd Compression = compress.Zstd )
type CompressionCodec ¶
type ConfigEntry ¶
type ConfigOperation ¶ added in v0.4.9
type ConfigOperation int8
const ( ConfigOperationSet ConfigOperation = 0 ConfigOperationDelete ConfigOperation = 1 ConfigOperationAppend ConfigOperation = 2 ConfigOperationSubtract ConfigOperation = 3 )
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a connection to a kafka broker.
Instances of Conn are safe to use concurrently from multiple goroutines.
func DialContext ¶
DialContext is a convenience wrapper for DefaultDialer.DialContext.
func DialLeader ¶
func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader is a convenience wrapper for DefaultDialer.DialLeader.
func DialPartition ¶ added in v0.2.1
func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
DialPartition is a convenience wrapper for DefaultDialer.DialPartition.
func NewConnWith ¶
func NewConnWith(conn net.Conn, config ConnConfig) *Conn
NewConnWith returns a new kafka connection configured with config. The offset is initialized to FirstOffset.
func (*Conn) ApiVersions ¶ added in v0.2.3
func (c *Conn) ApiVersions() ([]ApiVersion, error)
func (*Conn) Broker ¶ added in v0.4.18
Broker returns a Broker value representing the kafka broker that this connection was established to.
func (*Conn) Controller ¶ added in v0.2.3
Controller requests kafka for the current controller and returns its URL.
func (*Conn) CreateTopics ¶
func (c *Conn) CreateTopics(topics ...TopicConfig) error
CreateTopics creates one topic per provided configuration with idempotent operational semantics. In other words, if CreateTopics is invoked with a configuration for an existing topic, it will have no effect.
func (*Conn) DeleteTopics ¶
DeleteTopics deletes the specified topics.
func (*Conn) Offset ¶
Offset returns the current offset of the connection as pair of integers, where the first one is an offset value and the second one indicates how to interpret it.
See Seek for more details about the offset and whence values.
func (*Conn) Read ¶
Read reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message. The method returns the number of bytes read, or an error if something went wrong.
While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.
This method is provided to satisfy the net.Conn interface but is much less efficient than using the more general purpose ReadBatch method.
func (*Conn) ReadBatch ¶
ReadBatch reads a batch of messages from the kafka server. The method always returns a non-nil Batch value. If an error occurred, either sending the fetch request or reading the response, the error will be made available by the returned value of the batch's Close method.
While it is safe to call ReadBatch concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
A program doesn't specify the number of messages in wants from a batch, but gives the minimum and maximum number of bytes that it wants to receive from the kafka server.
func (*Conn) ReadBatchWith ¶ added in v0.2.3
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch
ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured with the default values in ReadBatchConfig except for minBytes and maxBytes.
func (*Conn) ReadFirstOffset ¶
ReadFirstOffset returns the first offset available on the connection.
func (*Conn) ReadLastOffset ¶
ReadLastOffset returns the last offset available on the connection.
func (*Conn) ReadMessage ¶
ReadMessage reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message.
Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.
While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
This method is provided for convenience purposes but is much less efficient than using the more general purpose ReadBatch method.
func (*Conn) ReadOffset ¶
ReadOffset returns the offset of the first message with a timestamp equal or greater to t.
func (*Conn) ReadOffsets ¶
ReadOffsets returns the absolute first and last offsets of the topic used by the connection.
func (*Conn) ReadPartitions ¶
ReadPartitions returns the list of available partitions for the given list of topics.
If the method is called with no topic, it uses the topic configured on the connection. If there are none, the method fetches all partitions of the kafka cluster.
func (*Conn) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*Conn) Seek ¶
Seek sets the offset for the next read or write operation according to whence, which should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent. When seeking relative to the end, the offset is subtracted from the current offset. Note that for historical reasons, these do not align with the usual whence constants as in lseek(2) or os.Seek. The method returns the new absolute offset of the connection.
func (*Conn) SetDeadline ¶
SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
A deadline is an absolute time after which I/O operations fail with a timeout (see type Error) instead of blocking. The deadline applies to all future and pending I/O, not just the immediately following call to Read or Write. After a deadline has been exceeded, the connection may be closed if it was found to be in an unrecoverable state.
A zero value for t means I/O operations will not time out.
func (*Conn) SetReadDeadline ¶
SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.
func (*Conn) SetRequiredAcks ¶
SetRequiredAcks sets the number of acknowledges from replicas that the connection requests when producing messages.
func (*Conn) SetWriteDeadline ¶
SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.
func (*Conn) Write ¶
Write writes a message to the kafka broker that this connection was established to. The method returns the number of bytes written, or an error if something went wrong.
The operation either succeeds or fail, it never partially writes the message.
This method is exposed to satisfy the net.Conn interface but is less efficient than the more general purpose WriteMessages method.
func (*Conn) WriteCompressedMessages ¶ added in v0.2.1
func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error)
WriteCompressedMessages writes a batch of messages to the connection's topic and partition, returning the number of bytes written. The write is an atomic operation, it either fully succeeds or fails.
If the compression codec is not nil, the messages will be compressed.
func (*Conn) WriteCompressedMessagesAt ¶ added in v0.2.3
func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error)
WriteCompressedMessagesAt writes a batch of messages to the connection's topic and partition, returning the number of bytes written, partition and offset numbers and timestamp assigned by the kafka broker to the message set. The write is an atomic operation, it either fully succeeds or fails.
If the compression codec is not nil, the messages will be compressed.
type ConnConfig ¶
type ConnConfig struct { ClientID string Topic string Partition int Broker int Rack string // The transactional id to use for transactional delivery. Idempotent // deliver should be enabled if transactional id is configured. // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs // Empty string means that this connection can't be transactional. TransactionalID string }
ConnConfig is a configuration object used to create new instances of Conn.
type ConsumerGroup ¶ added in v0.3.1
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup models a Kafka consumer group. A caller doesn't interact with the group directly. Rather, they interact with a Generation. Every time a member enters or exits the group, it results in a new Generation. The Generation is where partition assignments and offset management occur. Callers will use Next to get a handle to the Generation.
func NewConsumerGroup ¶ added in v0.3.1
func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error)
NewConsumerGroup creates a new ConsumerGroup. It returns an error if the provided configuration is invalid. It does not attempt to connect to the Kafka cluster. That happens asynchronously, and any errors will be reported by Next.
func (*ConsumerGroup) Close ¶ added in v0.3.1
func (cg *ConsumerGroup) Close() error
Close terminates the current generation by causing this member to leave and releases all local resources used to participate in the consumer group. Close will also end the current generation if it is still active.
func (*ConsumerGroup) Next ¶ added in v0.3.1
func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error)
Next waits for the next consumer group generation. There will never be two active generations. Next will never return a new generation until the previous one has completed.
If there are errors setting up the next generation, they will be surfaced here.
If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
type ConsumerGroupConfig ¶ added in v0.3.1
type ConsumerGroupConfig struct { // ID is the consumer group ID. It must not be empty. ID string // The list of broker addresses used to connect to the kafka cluster. It // must not be empty. Brokers []string // An dialer used to open connections to the kafka server. This field is // optional, if nil, the default dialer is used instead. Dialer *Dialer // Topics is the list of topics that will be consumed by this group. It // will usually have a single value, but it is permitted to have multiple // for more complex use cases. Topics []string // GroupBalancers is the priority-ordered list of client-side consumer group // balancing strategies that will be offered to the coordinator. The first // strategy that all group members support will be chosen by the leader. // // Default: [Range, RoundRobin] GroupBalancers []GroupBalancer // HeartbeatInterval sets the optional frequency at which the reader sends the consumer // group heartbeat update. // // Default: 3s HeartbeatInterval time.Duration // PartitionWatchInterval indicates how often a reader checks for partition changes. // If a reader sees a partition change (such as a partition add) it will rebalance the group // picking up new partitions. // // Default: 5s PartitionWatchInterval time.Duration // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be // polling the brokers and rebalancing if any partition changes happen to the topic. WatchPartitionChanges bool // SessionTimeout optionally sets the length of time that may pass without a heartbeat // before the coordinator considers the consumer dead and initiates a rebalance. // // Default: 30s SessionTimeout time.Duration // RebalanceTimeout optionally sets the length of time the coordinator will wait // for members to join as part of a rebalance. For kafka servers under higher // load, it may be useful to set this value higher. // // Default: 30s RebalanceTimeout time.Duration // JoinGroupBackoff optionally sets the length of time to wait before re-joining // the consumer group after an error. // // Default: 5s JoinGroupBackoff time.Duration // RetentionTime optionally sets the length of time the consumer group will // be saved by the broker. -1 will disable the setting and leave the // retention up to the broker's offsets.retention.minutes property. By // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= // 2.0. // // Default: -1 RetentionTime time.Duration // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset StartOffset int64 // If not nil, specifies a logger used to report internal changes within the // reader. Logger Logger // ErrorLogger is the logger used to report errors. If nil, the reader falls // back to using Logger instead. ErrorLogger Logger // Timeout is the network timeout used when communicating with the consumer // group coordinator. This value should not be too small since errors // communicating with the broker will generally cause a consumer group // rebalance, and it's undesirable that a transient network error intoduce // that overhead. Similarly, it should not be too large or the consumer // group may be slow to respond to the coordinator failing over to another // broker. // // Default: 5s Timeout time.Duration // contains filtered or unexported fields }
ConsumerGroupConfig is a configuration object used to create new instances of ConsumerGroup.
func (*ConsumerGroupConfig) Validate ¶ added in v0.3.1
func (config *ConsumerGroupConfig) Validate() error
Validate method validates ConsumerGroupConfig properties and sets relevant defaults.
type CoordinatorKeyType ¶ added in v0.4.10
type CoordinatorKeyType int8
CoordinatorKeyType is used to specify the type of coordinator to look for.
const ( // CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator. CoordinatorKeyTypeConsumer CoordinatorKeyType = 0 // CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator. CoordinatorKeyTypeTransaction CoordinatorKeyType = 1 )
type CreateACLsRequest ¶ added in v0.4.29
type CreateACLsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of ACL to create. ACLs []ACLEntry }
CreateACLsRequest represents a request sent to a kafka broker to add new ACLs.
type CreateACLsResponse ¶ added in v0.4.29
type CreateACLsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // List of errors that occurred while attempting to create // the ACLs. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors []error }
CreateACLsResponse represents a response from a kafka broker to an ACL creation request.
type CreatePartitionsRequest ¶ added in v0.4.9
type CreatePartitionsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of topics to create and their configuration. Topics []TopicPartitionsConfig // When set to true, topics are not created but the configuration is // validated as if they were. ValidateOnly bool }
CreatePartitionsRequest represents a request sent to a kafka broker to create and update topic parititions.
type CreatePartitionsResponse ¶ added in v0.4.9
type CreatePartitionsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Mapping of topic names to errors that occurred while attempting to create // the topics. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors map[string]error }
CreatePartitionsResponse represents a response from a kafka broker to a partition creation request.
type CreateTopicsRequest ¶ added in v0.4.0
type CreateTopicsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of topics to create and their configuration. Topics []TopicConfig // When set to true, topics are not created but the configuration is // validated as if they were. // // This field will be ignored if the kafka broker did not support the // CreateTopics API in version 1 or above. ValidateOnly bool }
CreateTopicRequests represents a request sent to a kafka broker to create new topics.
type CreateTopicsResponse ¶ added in v0.4.0
type CreateTopicsResponse struct { // The amount of time that the broker throttled the request. // // This field will be zero if the kafka broker did not support the // CreateTopics API in version 2 or above. Throttle time.Duration // Mapping of topic names to errors that occurred while attempting to create // the topics. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors map[string]error }
CreateTopicResponse represents a response from a kafka broker to a topic creation request.
type DeleteACLsFilter ¶ added in v0.4.43
type DeleteACLsFilter struct { ResourceTypeFilter ResourceType ResourceNameFilter string ResourcePatternTypeFilter PatternType PrincipalFilter string HostFilter string Operation ACLOperationType PermissionType ACLPermissionType }
type DeleteACLsMatchingACLs ¶ added in v0.4.43
type DeleteACLsMatchingACLs struct { Error error ResourceType ResourceType ResourceName string ResourcePatternType PatternType Principal string Host string Operation ACLOperationType PermissionType ACLPermissionType }
type DeleteACLsRequest ¶ added in v0.4.43
type DeleteACLsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of ACL filters to use for deletion. Filters []DeleteACLsFilter }
DeleteACLsRequest represents a request sent to a kafka broker to delete ACLs.
type DeleteACLsResponse ¶ added in v0.4.43
type DeleteACLsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // List of the results from the deletion request. Results []DeleteACLsResult }
DeleteACLsResponse represents a response from a kafka broker to an ACL deletion request.
type DeleteACLsResult ¶ added in v0.4.43
type DeleteACLsResult struct { Error error MatchingACLs []DeleteACLsMatchingACLs }
type DeleteGroupsRequest ¶ added in v0.4.40
type DeleteGroupsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Identifiers of groups to delete. GroupIDs []string }
DeleteGroupsRequest represents a request sent to a kafka broker to delete consumer groups.
type DeleteGroupsResponse ¶ added in v0.4.40
type DeleteGroupsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Mapping of group ids to errors that occurred while attempting to delete those groups. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors map[string]error }
DeleteGroupsResponse represents a response from a kafka broker to a consumer group deletion request.
type DeleteTopicsRequest ¶ added in v0.4.0
type DeleteTopicsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Names of topics to delete. Topics []string }
DeleteTopicsRequest represents a request sent to a kafka broker to delete topics.
type DeleteTopicsResponse ¶ added in v0.4.0
type DeleteTopicsResponse struct { // The amount of time that the broker throttled the request. // // This field will be zero if the kafka broker did not support the // DeleteTopics API in version 1 or above. Throttle time.Duration // Mapping of topic names to errors that occurred while attempting to delete // the topics. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Errors map[string]error }
DeleteTopicsResponse represents a response from a kafka broker to a topic deletion request.
type DescribeACLsRequest ¶ added in v0.4.43
type DescribeACLsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Filter to filter ACLs on. Filter ACLFilter }
DescribeACLsRequest represents a request sent to a kafka broker to describe existing ACLs.
type DescribeACLsResponse ¶ added in v0.4.43
type DescribeACLsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Error that occurred while attempting to describe // the ACLs. Error error // ACL resources returned from the describe request. Resources []ACLResource }
DescribeACLsResponse represents a response from a kafka broker to an ACL describe request.
type DescribeClientQuotasEntity ¶ added in v0.4.41
type DescribeClientQuotasRequest ¶ added in v0.4.41
type DescribeClientQuotasRequest struct { // Address of the kafka broker to send the request to Addr net.Addr // List of quota components to describe. Components []DescribeClientQuotasRequestComponent // Whether the match is strict, i.e. should exclude entities with // unspecified entity types. Strict bool }
DescribeClientQuotasRequest represents a request sent to a kafka broker to describe client quotas.
type DescribeClientQuotasRequestComponent ¶ added in v0.4.41
type DescribeClientQuotasRequestComponent struct { // The entity type that the filter component applies to. EntityType string // How to match the entity (0 = exact name, 1 = default name, // 2 = any specified name). MatchType int8 // The string to match against, or null if unused for the match type. Match string }
type DescribeClientQuotasResponse ¶ added in v0.4.41
type DescribeClientQuotasResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Error is set to a non-nil value including the code and message if a top-level // error was encountered when doing the update. Error error // List of describe client quota responses. Entries []DescribeClientQuotasResponseQuotas }
DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
type DescribeClientQuotasResponseQuotas ¶ added in v0.4.41
type DescribeClientQuotasResponseQuotas struct { // List of client quota entities and their descriptions. Entities []DescribeClientQuotasEntity // The client quota configuration values. Values []DescribeClientQuotasValue }
type DescribeClientQuotasValue ¶ added in v0.4.41
type DescribeConfigRequestResource ¶ added in v0.4.9
type DescribeConfigRequestResource struct { // Resource Type ResourceType ResourceType // Resource Name ResourceName string // ConfigNames is a list of configurations to update. ConfigNames []string }
type DescribeConfigResponseConfigEntry ¶ added in v0.4.9
type DescribeConfigResponseConfigEntry struct { ConfigName string ConfigValue string ReadOnly bool // Ignored if API version is greater than v0 IsDefault bool // Ignored if API version is less than v1 ConfigSource int8 IsSensitive bool // Ignored if API version is less than v1 ConfigSynonyms []DescribeConfigResponseConfigSynonym // Ignored if API version is less than v3 ConfigType int8 // Ignored if API version is less than v3 ConfigDocumentation string }
DescribeConfigResponseConfigEntry.
type DescribeConfigResponseConfigSynonym ¶ added in v0.4.9
type DescribeConfigResponseConfigSynonym struct { // Ignored if API version is less than v1 ConfigName string // Ignored if API version is less than v1 ConfigValue string // Ignored if API version is less than v1 ConfigSource int8 }
DescribeConfigResponseConfigSynonym.
type DescribeConfigResponseResource ¶ added in v0.4.9
type DescribeConfigResponseResource struct { // Resource Type ResourceType int8 // Resource Name ResourceName string // Error Error error // ConfigEntries ConfigEntries []DescribeConfigResponseConfigEntry }
DescribeConfigResponseResource.
type DescribeConfigsRequest ¶ added in v0.4.9
type DescribeConfigsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of resources to get details for. Resources []DescribeConfigRequestResource // Ignored if API version is less than v1 IncludeSynonyms bool // Ignored if API version is less than v3 IncludeDocumentation bool }
DescribeConfigsRequest represents a request sent to a kafka broker to describe configs.
type DescribeConfigsResponse ¶ added in v0.4.9
type DescribeConfigsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Resources Resources []DescribeConfigResponseResource }
DescribeConfigsResponse represents a response from a kafka broker to a describe config request.
type DescribeGroupsRequest ¶ added in v0.4.9
type DescribeGroupsRequest struct { // Addr is the address of the kafka broker to send the request to. Addr net.Addr // GroupIDs is a slice of groups to get details for. GroupIDs []string }
DescribeGroupsRequest is a request to the DescribeGroups API.
type DescribeGroupsResponse ¶ added in v0.4.9
type DescribeGroupsResponse struct { // Groups is a slice of details for the requested groups. Groups []DescribeGroupsResponseGroup }
DescribeGroupsResponse is a response from the DescribeGroups API.
type DescribeGroupsResponseAssignments ¶ added in v0.4.9
type DescribeGroupsResponseAssignments struct { // Version is the version of the assignments data. Version int // Topics contains the details of the partition assignments for each topic. Topics []GroupMemberTopic // UserData is the user data for the member. UserData []byte }
GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
type DescribeGroupsResponseGroup ¶ added in v0.4.9
type DescribeGroupsResponseGroup struct { // Error is set to a non-nil value if there was an error fetching the details // for this group. Error error // GroupID is the ID of the group. GroupID string // GroupState is a description of the group state. GroupState string // Members contains details about each member of the group. Members []DescribeGroupsResponseMember }
DescribeGroupsResponseGroup contains the response details for a single group.
type DescribeGroupsResponseMember ¶ added in v0.4.9
type DescribeGroupsResponseMember struct { // MemberID is the ID of the group member. MemberID string // ClientID is the ID of the client that the group member is using. ClientID string // ClientHost is the host of the client that the group member is connecting from. ClientHost string // MemberMetadata contains metadata about this group member. MemberMetadata DescribeGroupsResponseMemberMetadata // MemberAssignments contains the topic partitions that this member is assigned to. MemberAssignments DescribeGroupsResponseAssignments }
MemberInfo represents the membership information for a single group member.
type DescribeGroupsResponseMemberMetadata ¶ added in v0.4.9
type DescribeGroupsResponseMemberMetadata struct { // Version is the version of the metadata. Version int // Topics is the list of topics that the member is assigned to. Topics []string // UserData is the user data for the member. UserData []byte // OwnedPartitions contains the partitions owned by this group member; only set if // consumers are using a cooperative rebalancing assignor protocol. OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition }
GroupMemberMetadata stores metadata associated with a group member.
type DescribeGroupsResponseMemberMetadataOwnedPartition ¶ added in v0.4.21
type DescribeUserScramCredentialsCredentialInfo ¶ added in v0.4.43
type DescribeUserScramCredentialsCredentialInfo struct { Mechanism ScramMechanism Iterations int }
type DescribeUserScramCredentialsRequest ¶ added in v0.4.43
type DescribeUserScramCredentialsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of Scram users to describe Users []UserScramCredentialsUser }
DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to describe user scram credentials.
type DescribeUserScramCredentialsResponse ¶ added in v0.4.43
type DescribeUserScramCredentialsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Top level error that occurred while attempting to describe // the user scram credentials. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error // List of described user scram credentials. Results []DescribeUserScramCredentialsResponseResult }
DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user credentials request.
type DescribeUserScramCredentialsResponseResult ¶ added in v0.4.43
type DescribeUserScramCredentialsResponseResult struct { User string CredentialInfos []DescribeUserScramCredentialsCredentialInfo Error error }
type Dialer ¶
type Dialer struct { // Unique identifier for client connections established by this Dialer. ClientID string // Optionally specifies the function that the dialer uses to establish // network connections. If nil, net.(*Dialer).DialContext is used instead. // // When DialFunc is set, LocalAddr, DualStack, FallbackDelay, and KeepAlive // are ignored. DialFunc func(ctx context.Context, network string, address string) (net.Conn, error) // Timeout is the maximum amount of time a dial will wait for a connect to // complete. If Deadline is also set, it may fail earlier. // // The default is no timeout. // // When dialing a name with multiple IP addresses, the timeout may be // divided between them. // // With or without a timeout, the operating system may impose its own // earlier timeout. For instance, TCP timeouts are often around 3 minutes. Timeout time.Duration // Deadline is the absolute point in time after which dials will fail. // If Timeout is set, it may fail earlier. // Zero means no deadline, or dependent on the operating system as with the // Timeout option. Deadline time.Time // LocalAddr is the local address to use when dialing an address. // The address must be of a compatible type for the network being dialed. // If nil, a local address is automatically chosen. LocalAddr net.Addr // DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the // network is "tcp" and the destination is a host name with both IPv4 and // IPv6 addresses. This allows a client to tolerate networks where one // address family is silently broken. DualStack bool // FallbackDelay specifies the length of time to wait before spawning a // fallback connection, when DualStack is enabled. // If zero, a default delay of 300ms is used. FallbackDelay time.Duration // KeepAlive specifies the keep-alive period for an active network // connection. // If zero, keep-alives are not enabled. Network protocols that do not // support keep-alives ignore this field. KeepAlive time.Duration // Resolver optionally gives a hook to convert the broker address into an // alternate host or IP address which is useful for custom service discovery. // If a custom resolver returns any possible hosts, the first one will be // used and the original discarded. If a port number is included with the // resolved host, it will only be used if a port number was not previously // specified. If no port is specified or resolved, the default of 9092 will be // used. Resolver Resolver // TLS enables Dialer to open secure connections. If nil, standard net.Conn // will be used. TLS *tls.Config // SASLMechanism configures the Dialer to use SASL authentication. If nil, // no authentication will be performed. SASLMechanism sasl.Mechanism // The transactional id to use for transactional delivery. Idempotent // deliver should be enabled if transactional id is configured. // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs // Empty string means that the connection will be non-transactional. TransactionalID string }
The Dialer type mirrors the net.Dialer API but is designed to open kafka connections instead of raw network connections.
func (*Dialer) DialContext ¶
DialContext connects to the address on the named network using the provided context.
The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection.
When using TCP, and the host in the address parameter resolves to multiple network addresses, any dial timeout (from d.Timeout or ctx) is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect. For example, if a host has 4 IP addresses and the timeout is 1 minute, the connect to each single address will be given 15 seconds to complete before trying the next one.
func (*Dialer) DialLeader ¶
func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader opens a connection to the leader of the partition for a given topic.
The address given to the DialContext method may not be the one that the connection will end up being established to, because the dialer will lookup the partition leader for the topic and return a connection to that server. The original address is only used as a mechanism to discover the configuration of the kafka cluster that we're connecting to.
func (*Dialer) DialPartition ¶ added in v0.2.1
func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
DialPartition opens a connection to the leader of the partition specified by partition descriptor. It's strongly advised to use descriptor of the partition that comes out of functions LookupPartition or LookupPartitions.
func (*Dialer) LookupLeader ¶
func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, partition int) (Broker, error)
LookupLeader searches for the kafka broker that is the leader of the partition for a given topic, returning a Broker value representing it.
type DurationStats ¶
type DurationStats struct { Avg time.Duration `metric:"avg" type:"gauge"` Min time.Duration `metric:"min" type:"gauge"` Max time.Duration `metric:"max" type:"gauge"` Count int64 `metric:"count" type:"counter"` Sum time.Duration `metric:"sum" type:"counter"` }
DurationStats is a data structure that carries a summary of observed duration values.
type ElectLeadersRequest ¶ added in v0.4.9
type ElectLeadersRequest struct { // Addr is the address of the kafka broker to send the request to. Addr net.Addr // Topic is the name of the topic to do the leader elections in. Topic string // Partitions is the list of partitions to run leader elections for. Partitions []int // Timeout is the amount of time to wait for the election to run. Timeout time.Duration }
ElectLeadersRequest is a request to the ElectLeaders API.
type ElectLeadersResponse ¶ added in v0.4.9
type ElectLeadersResponse struct { // ErrorCode is set to a non-nil value if a top-level error occurred. Error error // PartitionResults contains the results for each partition leader election. PartitionResults []ElectLeadersResponsePartitionResult }
ElectLeadersResponse is a response from the ElectLeaders API.
type ElectLeadersResponsePartitionResult ¶ added in v0.4.9
type ElectLeadersResponsePartitionResult struct { // Partition is the ID of the partition. Partition int // Error is set to a non-nil value if an error occurred electing leaders // for this partition. Error error }
ElectLeadersResponsePartitionResult contains the response details for a single partition.
type EndTxnRequest ¶ added in v0.4.19
type EndTxnRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key. TransactionalID string // The Producer ID (PID) for the current producer session ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int // Committed should be set to true if the transaction was committed, false otherwise. Committed bool }
EndTxnRequest represets a request sent to a kafka broker to end a transaction.
type EndTxnResponse ¶ added in v0.4.19
type EndTxnResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Error is non-nil if an error occureda and contains the kafka error code. // Programs may use the standard errors.Is function to test the error // against kafka error codes. Error error }
EndTxnResponse represents a resposne from a kafka broker to an end transaction request.
type Error ¶
type Error int
Error represents the different error codes that may be returned by kafka. https://kafka.apache.org/protocol#protocol_error_codes
const ( Unknown Error = -1 OffsetOutOfRange Error = 1 InvalidMessage Error = 2 UnknownTopicOrPartition Error = 3 InvalidMessageSize Error = 4 LeaderNotAvailable Error = 5 NotLeaderForPartition Error = 6 RequestTimedOut Error = 7 BrokerNotAvailable Error = 8 ReplicaNotAvailable Error = 9 MessageSizeTooLarge Error = 10 StaleControllerEpoch Error = 11 OffsetMetadataTooLarge Error = 12 NetworkException Error = 13 GroupLoadInProgress Error = 14 GroupCoordinatorNotAvailable Error = 15 NotCoordinatorForGroup Error = 16 InvalidTopic Error = 17 RecordListTooLarge Error = 18 NotEnoughReplicas Error = 19 NotEnoughReplicasAfterAppend Error = 20 InvalidRequiredAcks Error = 21 IllegalGeneration Error = 22 InconsistentGroupProtocol Error = 23 InvalidGroupId Error = 24 UnknownMemberId Error = 25 InvalidSessionTimeout Error = 26 RebalanceInProgress Error = 27 InvalidCommitOffsetSize Error = 28 TopicAuthorizationFailed Error = 29 GroupAuthorizationFailed Error = 30 ClusterAuthorizationFailed Error = 31 InvalidTimestamp Error = 32 UnsupportedSASLMechanism Error = 33 IllegalSASLState Error = 34 UnsupportedVersion Error = 35 TopicAlreadyExists Error = 36 InvalidPartitionNumber Error = 37 InvalidReplicationFactor Error = 38 InvalidReplicaAssignment Error = 39 InvalidConfiguration Error = 40 NotController Error = 41 InvalidRequest Error = 42 UnsupportedForMessageFormat Error = 43 PolicyViolation Error = 44 OutOfOrderSequenceNumber Error = 45 DuplicateSequenceNumber Error = 46 InvalidProducerEpoch Error = 47 InvalidTransactionState Error = 48 InvalidProducerIDMapping Error = 49 InvalidTransactionTimeout Error = 50 ConcurrentTransactions Error = 51 TransactionCoordinatorFenced Error = 52 TransactionalIDAuthorizationFailed Error = 53 SecurityDisabled Error = 54 BrokerAuthorizationFailed Error = 55 KafkaStorageError Error = 56 LogDirNotFound Error = 57 SASLAuthenticationFailed Error = 58 UnknownProducerId Error = 59 ReassignmentInProgress Error = 60 DelegationTokenAuthDisabled Error = 61 DelegationTokenNotFound Error = 62 DelegationTokenOwnerMismatch Error = 63 DelegationTokenRequestNotAllowed Error = 64 DelegationTokenAuthorizationFailed Error = 65 DelegationTokenExpired Error = 66 InvalidPrincipalType Error = 67 NonEmptyGroup Error = 68 GroupIdNotFound Error = 69 FetchSessionIDNotFound Error = 70 InvalidFetchSessionEpoch Error = 71 ListenerNotFound Error = 72 TopicDeletionDisabled Error = 73 FencedLeaderEpoch Error = 74 UnknownLeaderEpoch Error = 75 UnsupportedCompressionType Error = 76 StaleBrokerEpoch Error = 77 OffsetNotAvailable Error = 78 MemberIDRequired Error = 79 PreferredLeaderNotAvailable Error = 80 GroupMaxSizeReached Error = 81 FencedInstanceID Error = 82 EligibleLeadersNotAvailable Error = 83 ElectionNotNeeded Error = 84 NoReassignmentInProgress Error = 85 GroupSubscribedToTopic Error = 86 InvalidRecord Error = 87 UnstableOffsetCommit Error = 88 ThrottlingQuotaExceeded Error = 89 ProducerFenced Error = 90 ResourceNotFound Error = 91 DuplicateResource Error = 92 UnacceptableCredential Error = 93 InconsistentVoterSet Error = 94 InvalidUpdateVersion Error = 95 FeatureUpdateFailed Error = 96 PrincipalDeserializationFailure Error = 97 SnapshotNotFound Error = 98 PositionOutOfRange Error = 99 UnknownTopicID Error = 100 DuplicateBrokerRegistration Error = 101 BrokerIDNotRegistered Error = 102 InconsistentTopicID Error = 103 InconsistentClusterID Error = 104 TransactionalIDNotFound Error = 105 FetchSessionTopicIDError Error = 106 )
func (Error) Description ¶
Description returns a human readable description of cause of the error.
func (Error) Temporary ¶
Temporary returns true if the operation that generated the error may succeed if retried at a later time. Kafka error documentation specifies these as "retriable" https://kafka.apache.org/protocol#protocol_error_codes
type FetchRequest ¶ added in v0.4.0
type FetchRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Topic, partition, and offset to retrieve records from. The offset may be // one of the special FirstOffset or LastOffset constants, in which case the // request will automatically discover the first or last offset of the // partition and submit the request for these. Topic string Partition int Offset int64 // Size and time limits of the response returned by the broker. MinBytes int64 MaxBytes int64 MaxWait time.Duration // The isolation level for the request. // // Defaults to ReadUncommitted. // // This field requires the kafka broker to support the Fetch API in version // 4 or above (otherwise the value is ignored). IsolationLevel IsolationLevel }
FetchRequest represents a request sent to a kafka broker to retrieve records from a topic partition.
type FetchResponse ¶ added in v0.4.0
type FetchResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // The topic and partition that the response came for (will match the values // in the request). Topic string Partition int // Information about the topic partition layout returned from the broker. // // LastStableOffset requires the kafka broker to support the Fetch API in // version 4 or above (otherwise the value is zero). // /// LogStartOffset requires the kafka broker to support the Fetch API in // version 5 or above (otherwise the value is zero). HighWatermark int64 LastStableOffset int64 LogStartOffset int64 // An error that may have occurred while attempting to fetch the records. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error // The set of records returned in the response. // // The program is expected to call the RecordSet's Close method when it // finished reading the records. // // Note that kafka may return record batches that start at an offset before // the one that was requested. It is the program's responsibility to skip // the offsets that it is not interested in. Records RecordReader }
FetchResponse represents a response from a kafka broker to a fetch request.
type FindCoordinatorRequest ¶ added in v0.4.10
type FindCoordinatorRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The coordinator key. Key string // The coordinator key type. (Group, transaction, etc.) KeyType CoordinatorKeyType }
FindCoordinatorRequest is the request structure for the FindCoordinator function.
type FindCoordinatorResponse ¶ added in v0.4.10
type FindCoordinatorResponse struct { // The Transaction/Group Coordinator details Coordinator *FindCoordinatorResponseCoordinator // The amount of time that the broker throttled the request. Throttle time.Duration // An error that may have occurred while attempting to retrieve Coordinator // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Error error }
FindCoordinatorResponse is the response structure for the FindCoordinator function.
type FindCoordinatorResponseCoordinator ¶ added in v0.4.10
type FindCoordinatorResponseCoordinator struct { // NodeID holds the broker id. NodeID int // Host of the broker Host string // Port on which broker accepts requests Port int }
FindCoordinatorResponseCoordinator contains details about the found coordinator.
type Generation ¶ added in v0.3.1
type Generation struct { // ID is the generation ID as assigned by the consumer group coordinator. ID int32 // GroupID is the name of the consumer group. GroupID string // MemberID is the ID assigned to this consumer by the consumer group // coordinator. MemberID string // Assignments is the initial state of this Generation. The partition // assignments are grouped by topic. Assignments map[string][]PartitionAssignment // contains filtered or unexported fields }
Generation represents a single consumer group generation. The generation carries the topic+partition assignments for the given. It also provides facilities for committing offsets and for running functions whose lifecycles are bound to the generation.
func (*Generation) CommitOffsets ¶ added in v0.3.1
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error
CommitOffsets commits the provided topic+partition+offset combos to the consumer group coordinator. This can be used to reset the consumer to explicit offsets.
Example (OverwriteOffsets) ¶
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ ID: "my-group", Brokers: []string{"kafka:9092"}, Topics: []string{"my-topic"}, }) if err != nil { fmt.Printf("error creating consumer group: %+v\n", err) os.Exit(1) } defer group.Close() gen, err := group.Next(context.TODO()) if err != nil { fmt.Printf("error getting next generation: %+v\n", err) os.Exit(1) } err = gen.CommitOffsets(map[string]map[int]int64{ "my-topic": { 0: 123, 1: 456, 3: 789, }, }) if err != nil { fmt.Printf("error committing offsets next generation: %+v\n", err) os.Exit(1) }
Output:
func (*Generation) Start ¶ added in v0.3.1
func (g *Generation) Start(fn func(ctx context.Context))
Start launches the provided function in a go routine and adds accounting such that when the function exits, it stops the current generation (if not already in the process of doing so).
The provided function MUST support cancellation via the ctx argument and exit in a timely manner once the ctx is complete. When the context is closed, the context's Error() function will return ErrGenerationEnded.
When closing out a generation, the consumer group will wait for all functions launched by Start to exit before the group can move on and join the next generation. If the function does not exit promptly, it will stop forward progress for this consumer and potentially cause consumer group membership churn.
Example (ConsumerGroupParallelReaders) ¶
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ ID: "my-group", Brokers: []string{"kafka:9092"}, Topics: []string{"my-topic"}, }) if err != nil { fmt.Printf("error creating consumer group: %+v\n", err) os.Exit(1) } defer group.Close() for { gen, err := group.Next(context.TODO()) if err != nil { break } assignments := gen.Assignments["my-topic"] for _, assignment := range assignments { partition, offset := assignment.ID, assignment.Offset gen.Start(func(ctx context.Context) { // create reader for this partition. reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"127.0.0.1:9092"}, Topic: "my-topic", Partition: partition, }) defer reader.Close() // seek to the last committed offset for this partition. reader.SetOffset(offset) for { msg, err := reader.ReadMessage(ctx) if err != nil { if errors.Is(err, kafka.ErrGenerationEnded) { // generation has ended. commit offsets. in a real app, // offsets would be committed periodically. gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}}) return } fmt.Printf("error reading message: %+v\n", err) return } fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) offset = msg.Offset } }) } }
Output:
type GroupBalancer ¶ added in v0.2.0
type GroupBalancer interface { // ProtocolName of the GroupBalancer ProtocolName() string // UserData provides the GroupBalancer an opportunity to embed custom // UserData into the metadata. // // Will be used by JoinGroup to begin the consumer group handshake. // // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest UserData() ([]byte, error) // DefineMemberships returns which members will be consuming // which topic partitions AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments }
GroupBalancer encapsulates the client side rebalancing logic.
type GroupMember ¶ added in v0.2.0
type GroupMember struct { // ID is the unique ID for this member as taken from the JoinGroup response. ID string // Topics is a list of topics that this member is consuming. Topics []string // UserData contains any information that the GroupBalancer sent to the // consumer group coordinator. UserData []byte }
GroupMember describes a single participant in a consumer group.
type GroupMemberAssignments ¶ added in v0.2.0
GroupMemberAssignments holds MemberID => topic => partitions.
type GroupMemberTopic ¶ added in v0.4.9
type GroupMemberTopic struct { // Topic is the name of the topic. Topic string // Partitions is a slice of partition IDs that this member is assigned to in the topic. Partitions []int }
GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used to represent the topic partitions that have been assigned to a group member.
type GroupProtocol ¶ added in v0.4.33
type GroupProtocol struct { // The protocol name. Name string // The protocol metadata. Metadata GroupProtocolSubscription }
GroupProtocol represents a consumer group protocol.
type GroupProtocolAssignment ¶ added in v0.4.33
type GroupProtocolAssignment struct { // The topics and partitions assigned to the group memeber. AssignedPartitions map[string][]int // UserData for the assignemnt. UserData []byte }
GroupProtocolAssignment represents an assignment of topics and partitions for a group memeber.
type GroupProtocolSubscription ¶ added in v0.4.33
type Hash ¶
Hash is a Balancer that uses the provided hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition.
The logic to calculate the partition is:
hasher.Sum32() % len(partitions) => partition
By default, Hash uses the FNV-1a algorithm. This is the same algorithm used by the Sarama Producer and ensures that messages produced by kafka-go will be delivered to the same topics that the Sarama producer would be delivered to.
type HeartbeatRequest ¶ added in v0.4.19
type HeartbeatRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // GroupID is the ID of the group. GroupID string // GenerationID is the current generation for the group. GenerationID int32 // MemberID is the ID of the group member. MemberID string // GroupInstanceID is a unique identifier for the consumer. GroupInstanceID string }
HeartbeatRequest represents a heartbeat sent to kafka to indicate consume liveness.
type HeartbeatResponse ¶ added in v0.4.19
type HeartbeatResponse struct { // Error is set to non-nil if an error occurred. Error error // The amount of time that the broker throttled the request. // // This field will be zero if the kafka broker did not support the // Heartbeat API in version 1 or above. Throttle time.Duration }
HeartbeatResponse represents a response from a heartbeat request.
type IncrementalAlterConfigsRequest ¶ added in v0.4.9
type IncrementalAlterConfigsRequest struct { // Addr is the address of the kafka broker to send the request to. Addr net.Addr // Resources contains the list of resources to update configs for. Resources []IncrementalAlterConfigsRequestResource // ValidateOnly indicates whether Kafka should validate the changes without actually // applying them. ValidateOnly bool }
IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
type IncrementalAlterConfigsRequestConfig ¶ added in v0.4.9
type IncrementalAlterConfigsRequestConfig struct { // Name is the name of the config. Name string // Value is the value to set for this config. Value string // ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.). ConfigOperation ConfigOperation }
IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should be altered.
type IncrementalAlterConfigsRequestResource ¶ added in v0.4.9
type IncrementalAlterConfigsRequestResource struct { // ResourceType is the type of resource to update. ResourceType ResourceType // ResourceName is the name of the resource to update (i.e., topic name or broker ID). ResourceName string // Configs contains the list of config key/values to update. Configs []IncrementalAlterConfigsRequestConfig }
IncrementalAlterConfigsRequestResource contains the details of a single resource type whose configs should be altered.
type IncrementalAlterConfigsResponse ¶ added in v0.4.9
type IncrementalAlterConfigsResponse struct { // Resources contains details of each resource config that was updated. Resources []IncrementalAlterConfigsResponseResource }
IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
type IncrementalAlterConfigsResponseResource ¶ added in v0.4.9
type IncrementalAlterConfigsResponseResource struct { // Error is set to a non-nil value if an error occurred while updating this specific // config. Error error // ResourceType is the type of resource that was updated. ResourceType ResourceType // ResourceName is the name of the resource that was updated. ResourceName string }
IncrementalAlterConfigsResponseResource contains the response details for a single resource whose configs were updated.
type InitProducerIDRequest ¶ added in v0.4.11
type InitProducerIDRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key. TransactionalID string // Time after which a transaction should time out TransactionTimeoutMs int // The Producer ID (PID). // This is used to disambiguate requests if a transactional id is reused following its expiration. // Only supported in version >=3 of the request, will be ignore otherwise. ProducerID int // The producer's current epoch. // This will be checked against the producer epoch on the broker, // and the request will return an error if they do not match. // Only supported in version >=3 of the request, will be ignore otherwise. ProducerEpoch int }
InitProducerIDRequest is the request structure for the InitProducerId function.
type InitProducerIDResponse ¶ added in v0.4.11
type InitProducerIDResponse struct { // The Transaction/Group Coordinator details Producer *ProducerSession // The amount of time that the broker throttled the request. Throttle time.Duration // An error that may have occurred while attempting to retrieve initProducerId // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Error error }
InitProducerIDResponse is the response structure for the InitProducerId function.
type IsolationLevel ¶ added in v0.2.3
type IsolationLevel int8
const ( ReadUncommitted IsolationLevel = 0 ReadCommitted IsolationLevel = 1 )
type JoinGroupRequest ¶ added in v0.4.33
type JoinGroupRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // GroupID of the group to join. GroupID string // The duration after which the coordinator considers the consumer dead // if it has not received a heartbeat. SessionTimeout time.Duration // The duration the coordination will wait for each member to rejoin when rebalancing the group. RebalanceTimeout time.Duration // The ID assigned by the group coordinator. MemberID string // The unique identifier for the consumer instance. GroupInstanceID string // The name for the class of protocols implemented by the group being joined. ProtocolType string // The list of protocols the member supports. Protocols []GroupProtocol }
JoinGroupRequest is the request structure for the JoinGroup function.
type JoinGroupResponse ¶ added in v0.4.33
type JoinGroupResponse struct { // An error that may have occurred when attempting to join the group. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error // The amount of time that the broker throttled the request. Throttle time.Duration // The generation ID of the group. GenerationID int // The group protocol selected by the coordinatior. ProtocolName string // The group protocol name. ProtocolType string // The leader of the group. LeaderID string // The group member ID. MemberID string // The members of the group. Members []JoinGroupResponseMember }
JoinGroupResponse is the response structure for the JoinGroup function.
type JoinGroupResponseMember ¶ added in v0.4.33
type JoinGroupResponseMember struct { // The group memmber ID. ID string // The unique identifier of the consumer instance. GroupInstanceID string // The group member metadata. Metadata GroupProtocolSubscription }
JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.
type LeastBytes ¶
type LeastBytes struct {
// contains filtered or unexported fields
}
LeastBytes is a Balancer implementation that routes messages to the partition that has received the least amount of data.
Note that no coordination is done between multiple producers, having good balancing relies on the fact that each producer using a LeastBytes balancer should produce well balanced messages.
type LeaveGroupRequest ¶ added in v0.4.33
type LeaveGroupRequest struct { // Address of the kafka broker to sent he request to. Addr net.Addr // GroupID of the group to leave. GroupID string // List of leaving member identities. Members []LeaveGroupRequestMember }
LeaveGroupRequest is the request structure for the LeaveGroup function.
type LeaveGroupRequestMember ¶ added in v0.4.33
type LeaveGroupRequestMember struct { // The member ID to remove from the group. ID string // The group instance ID to remove from the group. GroupInstanceID string }
LeaveGroupRequestMember represents the indentify of a member leaving a group.
type LeaveGroupResponse ¶ added in v0.4.33
type LeaveGroupResponse struct { // An error that may have occurred when attempting to leave the group. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error // The amount of time that the broker throttled the request. Throttle time.Duration // List of leaving member responses. Members []LeaveGroupResponseMember }
LeaveGroupResponse is the response structure for the LeaveGroup function.
type LeaveGroupResponseMember ¶ added in v0.4.33
type LeaveGroupResponseMember struct { // The member ID of the member leaving the group. ID string // The group instance ID to remove from the group. GroupInstanceID string // An error that may have occured when attempting to remove the member from the group. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error }
LeaveGroupResponseMember represents a member leaving the group.
type ListGroupsRequest ¶ added in v0.4.9
type ListGroupsRequest struct { // Addr is the address of the kafka broker to send the request to. Addr net.Addr }
ListGroupsRequest is a request to the ListGroups API.
type ListGroupsResponse ¶ added in v0.4.9
type ListGroupsResponse struct { // Error is set to a non-nil value if a top-level error occurred while fetching // groups. Error error // Groups contains the list of groups. Groups []ListGroupsResponseGroup }
ListGroupsResponse is a response from the ListGroups API.
type ListGroupsResponseGroup ¶ added in v0.4.9
type ListGroupsResponseGroup struct { // GroupID is the ID of the group. GroupID string // Coordinator is the ID of the coordinator broker for the group. Coordinator int }
ListGroupsResponseGroup contains the response details for a single group.
type ListOffsetsRequest ¶ added in v0.4.0
type ListOffsetsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // A mapping of topic names to list of partitions that the program wishes to // get the offsets for. Topics map[string][]OffsetRequest // The isolation level for the request. // // Defaults to ReadUncommitted. // // This field requires the kafka broker to support the ListOffsets API in // version 2 or above (otherwise the value is ignored). IsolationLevel IsolationLevel }
ListOffsetsRequest represents a request sent to a kafka broker to list of the offsets of topic partitions.
type ListOffsetsResponse ¶ added in v0.4.0
type ListOffsetsResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Mappings of topics names to partition offsets, there will be one entry // for each topic in the request. Topics map[string][]PartitionOffsets }
ListOffsetsResponse represents a response from a kafka broker to a offset listing request.
type ListPartitionReassignmentsRequest ¶ added in v0.4.45
type ListPartitionReassignmentsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // Topics we want reassignments for, mapped by their name, or nil to list everything. Topics map[string]ListPartitionReassignmentsRequestTopic // Timeout is the amount of time to wait for the request to complete. Timeout time.Duration }
ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
type ListPartitionReassignmentsRequestTopic ¶ added in v0.4.45
type ListPartitionReassignmentsRequestTopic struct { // The partitions to list partition reassignments for. PartitionIndexes []int }
ListPartitionReassignmentsRequestTopic contains the requested partitions for a single topic.
type ListPartitionReassignmentsResponse ¶ added in v0.4.45
type ListPartitionReassignmentsResponse struct { // Error is set to a non-nil value including the code and message if a top-level // error was encountered. Error error // Topics contains results for each topic, mapped by their name. Topics map[string]ListPartitionReassignmentsResponseTopic }
ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
type ListPartitionReassignmentsResponsePartition ¶ added in v0.4.45
type ListPartitionReassignmentsResponsePartition struct { // PartitionIndex contains index of the partition. PartitionIndex int // Replicas contains the current replica set. Replicas []int // AddingReplicas contains the set of replicas we are currently adding. AddingReplicas []int // RemovingReplicas contains the set of replicas we are currently removing. RemovingReplicas []int }
ListPartitionReassignmentsResponsePartition contains the detailed result of ongoing reassignments for a single partition.
type ListPartitionReassignmentsResponseTopic ¶ added in v0.4.45
type ListPartitionReassignmentsResponseTopic struct { // Partitions contains result for topic partitions. Partitions []ListPartitionReassignmentsResponsePartition }
ListPartitionReassignmentsResponseTopic contains the detailed result of ongoing reassignments for a topic.
type Logger ¶ added in v0.3.4
type Logger interface {
Printf(string, ...interface{})
}
Logger interface API for log.Logger.
type LoggerFunc ¶ added in v0.3.4
type LoggerFunc func(string, ...interface{})
LoggerFunc is a bridge between Logger and any third party logger Usage:
l := NewLogger() // some logger r := kafka.NewReader(kafka.ReaderConfig{ Logger: kafka.LoggerFunc(l.Infof), ErrorLogger: kafka.LoggerFunc(l.Errorf), })
func (LoggerFunc) Printf ¶ added in v0.3.4
func (f LoggerFunc) Printf(msg string, args ...interface{})
type Message ¶
type Message struct { // Topic indicates which topic this message was consumed from via Reader. // // When being used with Writer, this can be used to configure the topic if // not already specified on the writer itself. Topic string // Partition is read-only and MUST NOT be set when writing messages Partition int Offset int64 HighWaterMark int64 Key []byte Value []byte Headers []Header // This field is used to hold arbitrary data you wish to include, so it // will be available when handle it on the Writer's `Completion` method, // this support the application can do any post operation on each message. WriterData interface{} // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time }
Message is a data structure representing kafka messages.
type MessageTooLargeError ¶ added in v0.2.5
func (MessageTooLargeError) Error ¶ added in v0.2.5
func (e MessageTooLargeError) Error() string
type MetadataRequest ¶ added in v0.4.0
type MetadataRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The list of topics to retrieve metadata for. Topics []string }
MetadataRequest represents a request sent to a kafka broker to retrieve its cluster metadata.
type MetadataResponse ¶ added in v0.4.0
type MetadataResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Name of the kafka cluster that client retrieved metadata from. ClusterID string // The broker which is currently the controller for the cluster. Controller Broker // The list of brokers registered to the cluster. Brokers []Broker // The list of topics available on the cluster. Topics []Topic }
MetadatResponse represents a response from a kafka broker to a metadata request.
type Murmur2Balancer ¶ added in v0.3.1
type Murmur2Balancer struct { Consistent bool // contains filtered or unexported fields }
Murmur2Balancer is a Balancer that uses the Murmur2 hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition. This balancer is compatible with the partitioner used by the Java library and by librdkafka's "murmur2" and "murmur2_random" partitioners.
With the Consistent field false (default), this partitioner is equivalent to the "murmur2_random" setting in librdkafka. When Consistent is true, this partitioner is equivalent to the "murmur2" setting. The latter will hash nil keys into the same partition. Empty, non-nil keys are always hashed to the same partition regardless of configuration.
Unless you are absolutely certain that all your messages will have keys, it's best to leave the Consistent flag off. Otherwise, you run the risk of creating a very hot partition.
Note that the librdkafka documentation states that the "murmur2_random" is functionally equivalent to the default Java partitioner. That's because the Java partitioner will use a round robin balancer instead of random on nil keys. We choose librdkafka's implementation because it arguably has a larger install base.
type OffsetCommit ¶ added in v0.4.19
OffsetCommit represent the commit of an offset to a partition.
The extra metadata is opaque to the kafka protocol, it is intended to hold information like an identifier for the process that committed the offset, or the time at which the commit was made.
type OffsetCommitPartition ¶ added in v0.4.19
type OffsetCommitPartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to publish consumer // group offsets for this partition. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
OffsetFetchPartition represents the state of a single partition in responses to committing offsets.
type OffsetCommitRequest ¶ added in v0.4.19
type OffsetCommitRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // ID of the consumer group to publish the offsets for. GroupID string // ID of the consumer group generation. GenerationID int // ID of the group member submitting the offsets. MemberID string // ID of the group instance. InstanceID string // Set of topic partitions to publish the offsets for. // // Not that offset commits need to be submitted to the broker acting as the // group coordinator. This will be automatically resolved by the transport. Topics map[string][]OffsetCommit }
OffsetCommitRequest represents a request sent to a kafka broker to commit offsets for a partition.
type OffsetCommitResponse ¶ added in v0.4.19
type OffsetCommitResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has accepted offset commits // for. Topics map[string][]OffsetCommitPartition }
OffsetFetchResponse represents a response from a kafka broker to an offset commit request.
type OffsetDelete ¶ added in v0.4.36
OffsetDelete deletes the offset for a consumer group on a particular topic for a particular partition.
type OffsetDeletePartition ¶ added in v0.4.36
type OffsetDeletePartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to delete an offset for // this partition. Error error }
OffsetDeletePartition represents the state of a status of a partition in response to deleting offsets.
type OffsetDeleteRequest ¶ added in v0.4.36
type OffsetDeleteRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // ID of the consumer group to delete the offsets for. GroupID string // Set of topic partitions to delete offsets for. Topics map[string][]int }
OffsetDeleteRequest represents a request sent to a kafka broker to delete the offsets for a partition on a given topic associated with a consumer group.
type OffsetDeleteResponse ¶ added in v0.4.36
type OffsetDeleteResponse struct { // An error that may have occurred while attempting to delete an offset Error error // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has additional info (error?) // for. Topics map[string][]OffsetDeletePartition }
OffsetDeleteResponse represents a response from a kafka broker to a delete offset request.
type OffsetFetchPartition ¶ added in v0.4.0
type OffsetFetchPartition struct { // ID of the partition. Partition int // Last committed offsets on the partition when the request was served by // the kafka broker. CommittedOffset int64 // Consumer group metadata for this partition. Metadata string // An error that may have occurred while attempting to retrieve consumer // group offsets for this partition. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
OffsetFetchPartition represents the state of a single partition in a consumer group.
type OffsetFetchRequest ¶ added in v0.4.0
type OffsetFetchRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // ID of the consumer group to retrieve the offsets for. GroupID string // Set of topic partitions to retrieve the offsets for. Topics map[string][]int }
OffsetFetchRequest represents a request sent to a kafka broker to read the currently committed offsets of topic partitions.
type OffsetFetchResponse ¶ added in v0.4.0
type OffsetFetchResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has returned offsets for. Topics map[string][]OffsetFetchPartition // An error that may have occurred while attempting to retrieve consumer // group offsets. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
OffsetFetchResponse represents a response from a kafka broker to an offset fetch request.
type OffsetRequest ¶ added in v0.4.0
OffsetRequest represents a request to retrieve a single partition offset.
func FirstOffsetOf ¶ added in v0.4.0
func FirstOffsetOf(partition int) OffsetRequest
FirstOffsetOf constructs an OffsetRequest which asks for the first offset of the parition given as argument.
func LastOffsetOf ¶ added in v0.4.0
func LastOffsetOf(partition int) OffsetRequest
LastOffsetOf constructs an OffsetRequest which asks for the last offset of the partition given as argument.
func TimeOffsetOf ¶ added in v0.4.0
func TimeOffsetOf(partition int, at time.Time) OffsetRequest
TimeOffsetOf constructs an OffsetRequest which asks for a partition offset at a given time.
type Partition ¶
type Partition struct { // Name of the topic that the partition belongs to, and its index in the // topic. Topic string ID int // Leader, replicas, and ISR for the partition. // // When no physical host is known to be running a broker, the Host and Port // fields will be set to the zero values. The logical broker ID is always // set to the value known to the kafka cluster, even if the broker is not // currently backed by a physical host. Leader Broker Replicas []Broker Isr []Broker // Available only with metadata API level >= 6: OfflineReplicas []Broker // An error that may have occurred while attempting to read the partition // metadata. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
Partition carries the metadata associated with a kafka partition.
type PartitionAssignment ¶ added in v0.3.1
type PartitionAssignment struct { // ID is the partition ID. ID int // Offset is the initial offset at which this assignment begins. It will // either be an absolute offset if one has previously been committed for // the consumer group or a relative offset such as FirstOffset when this // is the first time the partition have been assigned to a member of the // group. Offset int64 }
PartitionAssignment represents the starting state of a partition that has been assigned to a consumer.
type PartitionOffsets ¶ added in v0.4.0
type PartitionOffsets struct { Partition int FirstOffset int64 LastOffset int64 Offsets map[int64]time.Time Error error }
PartitionOffsets carries information about offsets available in a topic partition.
type PatternType ¶ added in v0.4.29
type PatternType int8
const ( // PatternTypeUnknown represents any PatternType which this client cannot // understand. PatternTypeUnknown PatternType = 0 // PatternTypeAny matches any resource pattern type. PatternTypeAny PatternType = 1 // PatternTypeMatch perform pattern matching. PatternTypeMatch PatternType = 2 // PatternTypeLiteral represents a literal name. // A literal name defines the full name of a resource, e.g. topic with name // 'foo', or group with name 'bob'. PatternTypeLiteral PatternType = 3 // PatternTypePrefixed represents a prefixed name. // A prefixed name defines a prefix for a resource, e.g. topics with names // that start with 'foo'. PatternTypePrefixed PatternType = 4 )
func (PatternType) MarshalText ¶ added in v0.4.45
func (pt PatternType) MarshalText() ([]byte, error)
func (PatternType) String ¶ added in v0.4.45
func (pt PatternType) String() string
func (*PatternType) UnmarshalText ¶ added in v0.4.45
func (pt *PatternType) UnmarshalText(text []byte) error
type ProduceRequest ¶ added in v0.4.0
type ProduceRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The topic to produce the records to. Topic string // The partition to produce the records to. Partition int // The level of required acknowledgements to ask the kafka broker for. RequiredAcks RequiredAcks // The message format version used when encoding the records. // // By default, the client automatically determine which version should be // used based on the version of the Produce API supported by the server. MessageVersion int // An optional transaction id when producing to the kafka broker is part of // a transaction. TransactionalID string // The sequence of records to produce to the topic partition. Records RecordReader // An optional compression algorithm to apply to the batch of records sent // to the kafka broker. Compression Compression }
ProduceRequest represents a request sent to a kafka broker to produce records to a topic partition.
type ProduceResponse ¶ added in v0.4.0
type ProduceResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // An error that may have occurred while attempting to produce the records. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error // Offset of the first record that was written to the topic partition. // // This field will be zero if the kafka broker did not support Produce API // version 3 or above. BaseOffset int64 // Time at which the broker wrote the records to the topic partition. // // This field will be zero if the kafka broker did not support Produce API // version 2 or above. LogAppendTime time.Time // First offset in the topic partition that the records were written to. // // This field will be zero if the kafka broker did not support Produce // API version 5 or above (or if the first offset is zero). LogStartOffset int64 // If errors occurred writing specific records, they will be reported in // this map. // // This field will always be empty if the kafka broker did not support the // Produce API in version 8 or above. RecordErrors map[int]error }
ProduceResponse represents a response from a kafka broker to a produce request.
type ProducerSession ¶ added in v0.4.11
type ProducerSession struct { // The Producer ID (PID) for the current producer session ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int }
ProducerSession contains useful information about the producer session from the broker's response.
type RackAffinityGroupBalancer ¶ added in v0.3.6
type RackAffinityGroupBalancer struct { // Rack is the name of the rack where this consumer is running. It will be // communicated to the consumer group leader via the UserData so that // assignments can be made with affinity to the partition leader. Rack string }
RackAffinityGroupBalancer makes a best effort to pair up consumers with partitions whose leader is in the same rack. This strategy can have performance benefits by minimizing round trip latency between the consumer and the broker. In environments where network traffic across racks incurs charges (such as cross AZ data transfer in AWS), this strategy is also a cost optimization measure because it keeps network traffic within the local rack where possible.
The primary objective is to spread partitions evenly across consumers with a secondary focus on maximizing the number of partitions where the leader and the consumer are in the same rack. For best affinity, it's recommended to have a balanced spread of consumers and partition leaders across racks.
This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do not return the brokers' racks in the metadata request.
func (RackAffinityGroupBalancer) AssignGroups ¶ added in v0.3.6
func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
func (RackAffinityGroupBalancer) ProtocolName ¶ added in v0.3.6
func (r RackAffinityGroupBalancer) ProtocolName() string
func (RackAffinityGroupBalancer) UserData ¶ added in v0.3.6
func (r RackAffinityGroupBalancer) UserData() ([]byte, error)
type RangeGroupBalancer ¶ added in v0.2.0
type RangeGroupBalancer struct{}
RangeGroupBalancer groups consumers by partition
Example: 5 partitions, 2 consumers
C0: [0, 1, 2] C1: [3, 4]
Example: 6 partitions, 3 consumers
C0: [0, 1] C1: [2, 3] C2: [4, 5]
func (RangeGroupBalancer) AssignGroups ¶ added in v0.2.0
func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments
func (RangeGroupBalancer) ProtocolName ¶ added in v0.2.0
func (r RangeGroupBalancer) ProtocolName() string
func (RangeGroupBalancer) UserData ¶ added in v0.2.0
func (r RangeGroupBalancer) UserData() ([]byte, error)
type RawProduceRequest ¶ added in v0.4.47
type RawProduceRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The topic to produce the records to. Topic string // The partition to produce the records to. Partition int // The level of required acknowledgements to ask the kafka broker for. RequiredAcks RequiredAcks // The message format version used when encoding the records. // // By default, the client automatically determine which version should be // used based on the version of the Produce API supported by the server. MessageVersion int // An optional transaction id when producing to the kafka broker is part of // a transaction. TransactionalID string // The sequence of records to produce to the topic partition. RawRecords protocol.RawRecordSet }
RawProduceRequest represents a request sent to a kafka broker to produce records to a topic partition. The request contains a pre-encoded/raw record set.
type ReadBatchConfig ¶ added in v0.2.3
type ReadBatchConfig struct { // MinBytes indicates to the broker the minimum batch size that the consumer // will accept. Setting a high minimum when consuming from a low-volume topic // may result in delayed delivery when the broker does not have enough data to // satisfy the defined minimum. MinBytes int // MaxBytes indicates to the broker the maximum batch size that the consumer // will accept. The broker will truncate a message to satisfy this maximum, so // choose a value that is high enough for your largest message size. MaxBytes int // IsolationLevel controls the visibility of transactional records. // ReadUncommitted makes all records visible. With ReadCommitted only // non-transactional and committed records are visible. IsolationLevel IsolationLevel // MaxWait is the amount of time for the broker while waiting to hit the // min/max byte targets. This setting is independent of any network-level // timeouts or deadlines. // // For backward compatibility, when this field is left zero, kafka-go will // infer the max wait from the connection's read deadline. MaxWait time.Duration }
ReadBatchConfig is a configuration object used for reading batches of messages.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides a high-level API for consuming messages from kafka.
A Reader automatically manages reconnections to a kafka server, and blocking methods have context support for asynchronous cancellations.
Note that it is important to call `Close()` on a `Reader` when a process exits. The kafka server needs a graceful disconnect to stop it from continuing to attempt to send messages to the connected clients. The given example will not call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or SIGTERM (as docker stop or a kubernetes restart does). This can result in a delay when a new reader on the same topic connects (e.g. new process started or new container running). Use a `signal.Notify` handler to close the reader on process shutdown.
func NewReader ¶
func NewReader(config ReaderConfig) *Reader
NewReader creates and returns a new Reader configured with config. The offset is initialized to FirstOffset.
Example (RackAffinity) ¶
ExampleNewReader_rackAffinity shows how the RackAffinityGroupBalancer can be used to pair up consumers with brokers in the same AWS availability zone. This code assumes that each brokers' rack is configured to be the name of the AZ in which it is running.
package main import ( "context" "encoding/json" "io/ioutil" "net/http" "os" "strings" "time" ) // ExampleNewReader_rackAffinity shows how the RackAffinityGroupBalancer can be // used to pair up consumers with brokers in the same AWS availability zone. // This code assumes that each brokers' rack is configured to be the name of the // AZ in which it is running. func main() { r := NewReader(ReaderConfig{ Brokers: []string{"kafka:9092"}, GroupID: "my-group", Topic: "my-topic", GroupBalancers: []GroupBalancer{ RackAffinityGroupBalancer{Rack: findRack()}, RangeGroupBalancer{}, }, }) r.ReadMessage(context.Background()) r.Close() } // findRack is the basic rack resolver strategy for use in AWS. It supports // - ECS with the task metadata endpoint enabled (returns the container // instance's availability zone) // - Linux EC2 (returns the instance's availability zone) func findRack() string { switch whereAmI() { case "ecs": return ecsAvailabilityZone() case "ec2": return ec2AvailabilityZone() } return "" } const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI" // whereAmI determines which strategy the rack resolver should use. func whereAmI() string { // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html if os.Getenv(ecsContainerMetadataURI) != "" { return "ecs" } // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html for _, path := range [...]string{ "/sys/devices/virtual/dmi/id/product_uuid", "/sys/hypervisor/uuid", } { b, err := ioutil.ReadFile(path) if err != nil { continue } s := string(b) switch { case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"): return "ec2" } } return "somewhere" } // ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS // injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve // the availability zone where the task is running. func ecsAvailabilityZone() string { client := http.Client{ Timeout: time.Second, Transport: &http.Transport{ DisableCompression: true, DisableKeepAlives: true, }, } r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task") if err != nil { return "" } defer r.Body.Close() var md struct { AvailabilityZone string } if err := json.NewDecoder(r.Body).Decode(&md); err != nil { return "" } return md.AvailabilityZone } // ec2AvailabilityZone queries the metadata endpoint to discover the // availability zone where this code is running. we avoid calling this function // unless we know we're in EC2. Otherwise, in other environments, we would need // to wait for the request to 169.254.169.254 to timeout before proceeding. func ec2AvailabilityZone() string { client := http.Client{ Timeout: time.Second, Transport: &http.Transport{ DisableCompression: true, DisableKeepAlives: true, }, } r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone") if err != nil { return "" } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { return "" } return string(b) }
Output:
func (*Reader) Close ¶
Close closes the stream, preventing the program from reading any more messages from it.
func (*Reader) CommitMessages ¶
CommitMessages commits the list of messages passed as argument. The program may pass a context to asynchronously cancel the commit operation when it was configured to be blocking.
Because kafka consumer groups track a single offset per partition, the highest message offset passed to CommitMessages will cause all previous messages to be committed. Applications need to account for these Kafka limitations when committing messages, and maintain message ordering if they need strong delivery guarantees. This property makes it valid to pass only the last message seen to CommitMessages in order to move the offset of the topic/partition it belonged to forward, effectively committing all previous messages in the partition.
func (*Reader) Config ¶
func (r *Reader) Config() ReaderConfig
Config returns the reader's configuration.
func (*Reader) FetchMessage ¶
FetchMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.
The method returns io.EOF to indicate that the reader has been closed.
FetchMessage does not commit offsets automatically when using consumer groups. Use CommitMessages to commit the offset.
func (*Reader) Lag ¶
Lag returns the lag of the last message returned by ReadMessage, or -1 if r is backed by a consumer group.
func (*Reader) Offset ¶
Offset returns the current absolute offset of the reader, or -1 if r is backed by a consumer group.
func (*Reader) ReadLag ¶
ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage.
This method is intended to be used in cases where a program may be unable to call ReadMessage to update the value returned by Lag, but still needs to get an up to date estimation of how far behind the reader is. For example when the consumer is not ready to process the next message.
The function returns a lag of zero when the reader's current offset is negative.
func (*Reader) ReadMessage ¶
ReadMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.
The method returns io.EOF to indicate that the reader has been closed.
If consumer groups are used, ReadMessage will automatically commit the offset when called. Note that this could result in an offset being committed before the message is fully processed.
If more fine-grained control of when offsets are committed is required, it is recommended to use FetchMessage with CommitMessages instead.
func (*Reader) SetOffset ¶
SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed.
From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note while -1 and -2 were accepted to indicate the first or last offset in previous versions, the meanings of the numbers were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol specification.
func (*Reader) SetOffsetAt ¶ added in v0.2.3
SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t.
The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.
func (*Reader) Stats ¶
func (r *Reader) Stats() ReaderStats
Stats returns a snapshot of the reader stats since the last time the method was called, or since the reader was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka reader and report the metrics to a stats collection system.
type ReaderConfig ¶
type ReaderConfig struct { // The list of broker addresses used to connect to the kafka cluster. Brokers []string // GroupID holds the optional consumer group id. If GroupID is specified, then // Partition should NOT be specified e.g. 0 GroupID string // GroupTopics allows specifying multiple topics, but can only be used in // combination with GroupID, as it is a consumer-group feature. As such, if // GroupID is set, then either Topic or GroupTopics must be defined. GroupTopics []string // The topic to read messages from. Topic string // Partition to read messages from. Either Partition or GroupID may // be assigned, but not both Partition int // An dialer used to open connections to the kafka server. This field is // optional, if nil, the default dialer is used instead. Dialer *Dialer // The capacity of the internal message queue, defaults to 100 if none is // set. QueueCapacity int // MinBytes indicates to the broker the minimum batch size that the consumer // will accept. Setting a high minimum when consuming from a low-volume topic // may result in delayed delivery when the broker does not have enough data to // satisfy the defined minimum. // // Default: 1 MinBytes int // MaxBytes indicates to the broker the maximum batch size that the consumer // will accept. The broker will truncate a message to satisfy this maximum, so // choose a value that is high enough for your largest message size. // // Default: 1MB MaxBytes int // Maximum amount of time to wait for new data to come when fetching batches // of messages from kafka. // // Default: 10s MaxWait time.Duration // ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch. // // Default: 10s ReadBatchTimeout time.Duration // ReadLagInterval sets the frequency at which the reader lag is updated. // Setting this field to a negative value disables lag reporting. ReadLagInterval time.Duration // GroupBalancers is the priority-ordered list of client-side consumer group // balancing strategies that will be offered to the coordinator. The first // strategy that all group members support will be chosen by the leader. // // Default: [Range, RoundRobin] // // Only used when GroupID is set GroupBalancers []GroupBalancer // HeartbeatInterval sets the optional frequency at which the reader sends the consumer // group heartbeat update. // // Default: 3s // // Only used when GroupID is set HeartbeatInterval time.Duration // CommitInterval indicates the interval at which offsets are committed to // the broker. If 0, commits will be handled synchronously. // // Default: 0 // // Only used when GroupID is set CommitInterval time.Duration // PartitionWatchInterval indicates how often a reader checks for partition changes. // If a reader sees a partition change (such as a partition add) it will rebalance the group // picking up new partitions. // // Default: 5s // // Only used when GroupID is set and WatchPartitionChanges is set. PartitionWatchInterval time.Duration // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be // polling the brokers and rebalancing if any partition changes happen to the topic. WatchPartitionChanges bool // SessionTimeout optionally sets the length of time that may pass without a heartbeat // before the coordinator considers the consumer dead and initiates a rebalance. // // Default: 30s // // Only used when GroupID is set SessionTimeout time.Duration // RebalanceTimeout optionally sets the length of time the coordinator will wait // for members to join as part of a rebalance. For kafka servers under higher // load, it may be useful to set this value higher. // // Default: 30s // // Only used when GroupID is set RebalanceTimeout time.Duration // JoinGroupBackoff optionally sets the length of time to wait between re-joining // the consumer group after an error. // // Default: 5s JoinGroupBackoff time.Duration // RetentionTime optionally sets the length of time the consumer group will be saved // by the broker // // Default: 24h // // Only used when GroupID is set RetentionTime time.Duration // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset // // Only used when GroupID is set StartOffset int64 // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before // polling for new messages // // Default: 100ms ReadBackoffMin time.Duration // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before // polling for new messages // // Default: 1s ReadBackoffMax time.Duration // If not nil, specifies a logger used to report internal changes within the // reader. Logger Logger // ErrorLogger is the logger used to report errors. If nil, the reader falls // back to using Logger instead. ErrorLogger Logger // IsolationLevel controls the visibility of transactional records. // ReadUncommitted makes all records visible. With ReadCommitted only // non-transactional and committed records are visible. IsolationLevel IsolationLevel // Limit of how many attempts to connect will be made before returning the error. // // The default is to try 3 times. MaxAttempts int // OffsetOutOfRangeError indicates that the reader should return an error in // the event of an OffsetOutOfRange error, rather than retrying indefinitely. // This flag is being added to retain backwards-compatibility, so it will be // removed in a future version of kafka-go. OffsetOutOfRangeError bool }
ReaderConfig is a configuration object used to create new instances of Reader.
func (*ReaderConfig) Validate ¶ added in v0.2.3
func (config *ReaderConfig) Validate() error
Validate method validates ReaderConfig properties.
type ReaderStats ¶
type ReaderStats struct { Dials int64 `metric:"kafka.reader.dial.count" type:"counter"` Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"` Messages int64 `metric:"kafka.reader.message.count" type:"counter"` Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"` Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"` Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"` Errors int64 `metric:"kafka.reader.error.count" type:"counter"` DialTime DurationStats `metric:"kafka.reader.dial.seconds"` ReadTime DurationStats `metric:"kafka.reader.read.seconds"` WaitTime DurationStats `metric:"kafka.reader.wait.seconds"` FetchSize SummaryStats `metric:"kafka.reader.fetch.size"` FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"` Offset int64 `metric:"kafka.reader.offset" type:"gauge"` Lag int64 `metric:"kafka.reader.lag" type:"gauge"` MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"` MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"` MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"` QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"` QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"` ClientID string `tag:"client_id"` Topic string `tag:"topic"` Partition string `tag:"partition"` // The original `Fetches` field had a typo where the metric name was called // "kafak..." instead of "kafka...", in order to offer time to fix monitors // that may be relying on this mistake we are temporarily introducing this // field. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"` }
ReaderStats is a data structure returned by a call to Reader.Stats that exposes details about the behavior of the reader.
type Record ¶ added in v0.4.0
Record is an interface representing a single kafka record.
Record values are not safe to use concurrently from multiple goroutines.
type RecordReader ¶ added in v0.4.0
type RecordReader = protocol.RecordReader
RecordReader is an interface representing a sequence of records. Record sets are used in both produce and fetch requests to represent the sequence of records that are sent to or receive from kafka brokers.
RecordReader values are not safe to use concurrently from multiple goroutines.
func NewRecordReader ¶ added in v0.4.0
func NewRecordReader(records ...Record) RecordReader
NewRecordReade reconstructs a RecordSet which exposes the sequence of records passed as arguments.
type ReferenceHash ¶ added in v0.4.32
ReferenceHash is a Balancer that uses the provided hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition.
The logic to calculate the partition is:
(int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition
By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm as the Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go will be delivered to the same topics that the Sarama producer would be delivered to.
type ReplicaAssignment ¶
type ReplicaAssignment struct { Partition int // The list of brokers where the partition should be allocated. There must // be as many entries in thie list as there are replicas of the partition. // The first entry represents the broker that will be the preferred leader // for the partition. // // This field changed in 0.4 from `int` to `[]int`. It was invalid to pass // a single integer as this is supposed to be a list. While this introduces // a breaking change, it probably never worked before. Replicas []int }
type Request ¶ added in v0.4.0
Request is an interface implemented by types that represent messages sent from kafka clients to brokers.
type RequiredAcks ¶ added in v0.4.1
type RequiredAcks int
const ( RequireNone RequiredAcks = 0 RequireOne RequiredAcks = 1 RequireAll RequiredAcks = -1 )
func (RequiredAcks) MarshalText ¶ added in v0.4.21
func (acks RequiredAcks) MarshalText() ([]byte, error)
func (RequiredAcks) String ¶ added in v0.4.1
func (acks RequiredAcks) String() string
func (*RequiredAcks) UnmarshalText ¶ added in v0.4.21
func (acks *RequiredAcks) UnmarshalText(b []byte) error
type Resolver ¶
type Resolver interface { // LookupHost looks up the given host using the local resolver. // It returns a slice of that host's addresses. LookupHost(ctx context.Context, host string) (addrs []string, err error) }
The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.
type ResourceType ¶ added in v0.4.9
type ResourceType int8
const ( ResourceTypeUnknown ResourceType = 0 ResourceTypeAny ResourceType = 1 ResourceTypeTopic ResourceType = 2 ResourceTypeGroup ResourceType = 3 // See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36 ResourceTypeBroker ResourceType = 4 ResourceTypeCluster ResourceType = 4 ResourceTypeTransactionalID ResourceType = 5 ResourceTypeDelegationToken ResourceType = 6 )
func (ResourceType) MarshalText ¶ added in v0.4.45
func (rt ResourceType) MarshalText() ([]byte, error)
func (ResourceType) String ¶ added in v0.4.45
func (rt ResourceType) String() string
func (*ResourceType) UnmarshalText ¶ added in v0.4.45
func (rt *ResourceType) UnmarshalText(text []byte) error
type Response ¶ added in v0.4.0
Response is an interface implemented by types that represent messages sent from kafka brokers in response to client requests.
type RoundRobin ¶
type RoundRobin struct { ChunkSize int // contains filtered or unexported fields }
RoundRobin is an Balancer implementation that equally distributes messages across all available partitions. It can take an optional chunk size to send ChunkSize messages to the same partition before moving to the next partition. This can be used to improve batch sizes.
type RoundRobinGroupBalancer ¶ added in v0.2.0
type RoundRobinGroupBalancer struct{}
RoundrobinGroupBalancer divides partitions evenly among consumers
Example: 5 partitions, 2 consumers
C0: [0, 2, 4] C1: [1, 3]
Example: 6 partitions, 3 consumers
C0: [0, 3] C1: [1, 4] C2: [2, 5]
func (RoundRobinGroupBalancer) AssignGroups ¶ added in v0.2.0
func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments
func (RoundRobinGroupBalancer) ProtocolName ¶ added in v0.2.0
func (r RoundRobinGroupBalancer) ProtocolName() string
func (RoundRobinGroupBalancer) UserData ¶ added in v0.2.0
func (r RoundRobinGroupBalancer) UserData() ([]byte, error)
type RoundTripper ¶ added in v0.4.0
type RoundTripper interface { // RoundTrip sends a request to a kafka broker and returns the response that // was received, or a non-nil error. // // The context passed as first argument can be used to asynchronnously abort // the call if needed. RoundTrip(context.Context, net.Addr, Request) (Response, error) }
RoundTripper is an interface implemented by types which support interacting with kafka brokers.
type ScramMechanism ¶ added in v0.4.43
type ScramMechanism int8
const ( ScramMechanismUnknown ScramMechanism = iota // 0 ScramMechanismSha256 // 1 ScramMechanismSha512 // 2 )
type SummaryStats ¶
type SummaryStats struct { Avg int64 `metric:"avg" type:"gauge"` Min int64 `metric:"min" type:"gauge"` Max int64 `metric:"max" type:"gauge"` Count int64 `metric:"count" type:"counter"` Sum int64 `metric:"sum" type:"counter"` }
SummaryStats is a data structure that carries a summary of observed values.
type SyncGroupRequest ¶ added in v0.4.33
type SyncGroupRequest struct { // Address of the kafka broker to sent he request to. Addr net.Addr // GroupID of the group to sync. GroupID string // The generation of the group. GenerationID int // The member ID assigned by the group. MemberID string // The unique identifier for the consumer instance. GroupInstanceID string // The name for the class of protocols implemented by the group being joined. ProtocolType string // The group protocol name. ProtocolName string // The group member assignments. Assignments []SyncGroupRequestAssignment }
SyncGroupRequest is the request structure for the SyncGroup function.
type SyncGroupRequestAssignment ¶ added in v0.4.33
type SyncGroupRequestAssignment struct { // The ID of the member to assign. MemberID string // The member assignment. Assignment GroupProtocolAssignment }
SyncGroupRequestAssignment represents an assignement for a goroup memeber.
type SyncGroupResponse ¶ added in v0.4.33
type SyncGroupResponse struct { // An error that may have occurred when attempting to sync the group. // // The errors contain the kafka error code. Programs may use the standard // errors.Is function to test the error against kafka error codes. Error error // The amount of time that the broker throttled the request. Throttle time.Duration // The group protocol type. ProtocolType string // The group protocol name. ProtocolName string // The member assignment. Assignment GroupProtocolAssignment }
SyncGroupResponse is the response structure for the SyncGroup function.
type Topic ¶ added in v0.4.0
type Topic struct { // Name of the topic. Name string // True if the topic is internal. Internal bool // The list of partition currently available on this topic. Partitions []Partition // An error that may have occurred while attempting to read the topic // metadata. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
Topic represents a topic in a kafka cluster.
type TopicAndGroup ¶ added in v0.2.5
A ConsumerGroup and Topic as these are both strings we define a type for clarity when passing to the Client as a function argument
N.B TopicAndGroup is currently experimental! Therefore, it is subject to change, including breaking changes between MINOR and PATCH releases.
DEPRECATED: this type will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.
type TopicConfig ¶
type TopicConfig struct { // Topic name Topic string // NumPartitions created. -1 indicates unset. NumPartitions int // ReplicationFactor for the topic. -1 indicates unset. ReplicationFactor int // ReplicaAssignments among kafka brokers for this topic partitions. If this // is set num_partitions and replication_factor must be unset. ReplicaAssignments []ReplicaAssignment // ConfigEntries holds topic level configuration for topic to be set. ConfigEntries []ConfigEntry }
type TopicPartitionAssignment ¶ added in v0.4.9
type TopicPartitionAssignment struct { // Broker IDs BrokerIDs []int32 }
type TopicPartitionsConfig ¶ added in v0.4.9
type TopicPartitionsConfig struct { // Topic name Name string // Topic partition's count. Count int32 // TopicPartitionAssignments among kafka brokers for this topic partitions. TopicPartitionAssignments []TopicPartitionAssignment }
type Transport ¶ added in v0.4.0
type Transport struct { // A function used to establish connections to the kafka cluster. Dial func(context.Context, string, string) (net.Conn, error) // Time limit set for establishing connections to the kafka cluster. This // limit includes all round trips done to establish the connections (TLS // handshake, SASL negotiation, etc...). // // Defaults to 5s. DialTimeout time.Duration // Maximum amount of time that connections will remain open and unused. // The transport will manage to automatically close connections that have // been idle for too long, and re-open them on demand when the transport is // used again. // // Defaults to 30s. IdleTimeout time.Duration // TTL for the metadata cached by this transport. Note that the value // configured here is an upper bound, the transport randomizes the TTLs to // avoid getting into states where multiple clients end up synchronized and // cause bursts of requests to the kafka broker. // // Default to 6s. MetadataTTL time.Duration // Topic names for the metadata cached by this transport. If this field is left blank, // metadata information of all topics in the cluster will be retrieved. MetadataTopics []string // Unique identifier that the transport communicates to the brokers when it // sends requests. ClientID string // An optional configuration for TLS connections established by this // transport. // // If the Server TLS *tls.Config // SASL configures the Transfer to use SASL authentication. SASL sasl.Mechanism // An optional resolver used to translate broker host names into network // addresses. // // The resolver will be called for every request (not every connection), // making it possible to implement ACL policies by validating that the // program is allowed to connect to the kafka broker. This also means that // the resolver should probably provide a caching layer to avoid storming // the service discovery backend with requests. // // When set, the Dial function is not responsible for performing name // resolution, and is always called with a pre-resolved address. Resolver BrokerResolver // The background context used to control goroutines started internally by // the transport. // // If nil, context.Background() is used instead. Context context.Context // contains filtered or unexported fields }
Transport is an implementation of the RoundTripper interface.
Transport values manage a pool of connections and automatically discovers the clusters layout to route requests to the appropriate brokers.
Transport values are safe to use concurrently from multiple goroutines.
Note: The intent is for the Transport to become the underlying layer of the kafka.Reader and kafka.Writer types.
func (*Transport) CloseIdleConnections ¶ added in v0.4.0
func (t *Transport) CloseIdleConnections()
CloseIdleConnections closes all idle connections immediately, and marks all connections that are in use to be closed when they become idle again.
func (*Transport) RoundTrip ¶ added in v0.4.0
RoundTrip sends a request to a kafka cluster and returns the response, or an error if no responses were received.
Message types are available in sub-packages of the protocol package. Each kafka API is implemented in a different sub-package. For example, the request and response types for the Fetch API are available in the protocol/fetch package.
The type of the response message will match the type of the request. For example, if RoundTrip was called with a *fetch.Request as argument, the value returned will be of type *fetch.Response. It is safe for the program to do a type assertion after checking that no error was returned.
This example illustrates the way this method is expected to be used:
r, err := transport.RoundTrip(ctx, addr, &fetch.Request{ ... }) if err != nil { ... } else { res := r.(*fetch.Response) ... }
The transport automatically selects the highest version of the API that is supported by both the kafka-go package and the kafka broker. The negotiation happens transparently once when connections are established.
This API was introduced in version 0.4 as a way to leverage the lower-level features of the kafka protocol, but also provide a more efficient way of managing connections to kafka brokers.
type TxnOffsetCommit ¶ added in v0.4.20
TxnOffsetCommit represent the commit of an offset to a partition within a transaction.
The extra metadata is opaque to the kafka protocol, it is intended to hold information like an identifier for the process that committed the offset, or the time at which the commit was made.
type TxnOffsetCommitPartition ¶ added in v0.4.20
type TxnOffsetCommitPartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to publish consumer // group offsets for this partition. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error }
TxnOffsetFetchPartition represents the state of a single partition in responses to committing offsets within a transaction.
type TxnOffsetCommitRequest ¶ added in v0.4.20
type TxnOffsetCommitRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key. TransactionalID string // ID of the consumer group to publish the offsets for. GroupID string // The Producer ID (PID) for the current producer session; // received from an InitProducerID request. ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int // GenerationID is the current generation for the group. GenerationID int // ID of the group member submitting the offsets. MemberID string // GroupInstanceID is a unique identifier for the consumer. GroupInstanceID string // Set of topic partitions to publish the offsets for. // // Not that offset commits need to be submitted to the broker acting as the // group coordinator. This will be automatically resolved by the transport. Topics map[string][]TxnOffsetCommit }
TxnOffsetCommitRequest represents a request sent to a kafka broker to commit offsets for a partition within a transaction.
type TxnOffsetCommitResponse ¶ added in v0.4.20
type TxnOffsetCommitResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has accepted offset commits // for. Topics map[string][]TxnOffsetCommitPartition }
TxnOffsetFetchResponse represents a response from a kafka broker to an offset commit request within a transaction.
type UserScramCredentialsDeletion ¶ added in v0.4.43
type UserScramCredentialsDeletion struct { Name string Mechanism ScramMechanism }
type UserScramCredentialsUpsertion ¶ added in v0.4.43
type UserScramCredentialsUpsertion struct { Name string Mechanism ScramMechanism Iterations int Salt []byte SaltedPassword []byte }
type UserScramCredentialsUser ¶ added in v0.4.43
type UserScramCredentialsUser struct {
Name string
}
type Version ¶ added in v0.4.0
type Version int16
Version represents a version number for kafka APIs.
type WriteErrors ¶ added in v0.4.1
type WriteErrors []error
WriteError is returned by kafka.(*Writer).WriteMessages when the writer is not configured to write messages asynchronously. WriteError values contain a list of errors where each entry matches the position of a message in the WriteMessages call. The program can determine the status of each message by looping over the error:
switch err := w.WriteMessages(ctx, msgs...).(type) { case nil: case kafka.WriteErrors: for i := range msgs { if err[i] != nil { // handle the error writing msgs[i] ... } } default: // handle other errors ... }
func (WriteErrors) Count ¶ added in v0.4.1
func (err WriteErrors) Count() int
Count counts the number of non-nil errors in err.
func (WriteErrors) Error ¶ added in v0.4.1
func (err WriteErrors) Error() string
type Writer ¶
type Writer struct { // Address of the kafka cluster that this writer is configured to send // messages to. // // This field is required, attempting to write messages to a writer with a // nil address will error. Addr net.Addr // Topic is the name of the topic that the writer will produce messages to. // // Setting this field or not is a mutually exclusive option. If you set Topic // here, you must not set Topic for any produced Message. Otherwise, if you do // not set Topic, every Message must have Topic specified. Topic string // The balancer used to distribute messages across partitions. // // The default is to use a round-robin distribution. Balancer Balancer // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // WriteBackoffMin optionally sets the smallest amount of time the writer waits before // it attempts to write a batch of messages // // Default: 100ms WriteBackoffMin time.Duration // WriteBackoffMax optionally sets the maximum amount of time the writer waits before // it attempts to write a batch of messages // // Default: 1s WriteBackoffMax time.Duration // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int // Limit the maximum size of a request in bytes before being sent to // a partition. // // The default is to use a kafka default value of 1048576. BatchBytes int64 // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration // Number of acknowledges from partition replicas required before receiving // a response to a produce request, the following values are supported: // // RequireNone (0) fire-and-forget, do not wait for acknowledgements from the // RequireOne (1) wait for the leader to acknowledge the writes // RequireAll (-1) wait for the full ISR to acknowledge the writes // // Defaults to RequireNone. RequiredAcks RequiredAcks // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. // // Defaults to false. Async bool // An optional function called when the writer succeeds or fails the // delivery of messages to a kafka partition. When writing the messages // fails, the `err` parameter will be non-nil. // // The messages that the Completion function is called with have their // topic, partition, offset, and time set based on the Produce responses // received from kafka. All messages passed to a call to the function have // been written to the same partition. The keys and values of messages are // referencing the original byte slices carried by messages in the calls to // WriteMessages. // // The function is called from goroutines started by the writer. Calls to // Close will block on the Completion function calls. When the Writer is // not writing asynchronously, the WriteMessages call will also block on // Completion function, which is a useful guarantee if the byte slices // for the message keys and values are intended to be reused after the // WriteMessages call returned. // // If a completion function panics, the program terminates because the // panic is not recovered by the writer and bubbles up to the top of the // goroutine's call stack. Completion func(messages []Message, err error) // Compression set the compression codec to be used to compress messages. Compression Compression // If not nil, specifies a logger used to report internal changes within the // writer. Logger Logger // ErrorLogger is the logger used to report errors. If nil, the writer falls // back to using Logger instead. ErrorLogger Logger // A transport used to send messages to kafka clusters. // // If nil, DefaultTransport is used. Transport RoundTripper // AllowAutoTopicCreation notifies writer to create topic if missing. AllowAutoTopicCreation bool // contains filtered or unexported fields }
The Writer type provides the implementation of a producer of kafka messages that automatically distributes messages across partitions of a single topic using a configurable balancing policy.
Writes manage the dispatch of messages across partitions of the topic they are configured to write to using a Balancer, and aggregate batches to optimize the writes to kafka.
Writers may be configured to be used synchronously or asynchronously. When use synchronously, calls to WriteMessages block until the messages have been written to kafka. In this mode, the program should inspect the error returned by the function and test if it an instance of kafka.WriteErrors in order to identify which messages have succeeded or failed, for example:
// Construct a synchronous writer (the default mode). w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", RequiredAcks: kafka.RequireAll, } ... // Passing a context can prevent the operation from blocking indefinitely. switch err := w.WriteMessages(ctx, msgs...).(type) { case nil: case kafka.WriteErrors: for i := range msgs { if err[i] != nil { // handle the error writing msgs[i] ... } } default: // handle other errors ... }
In asynchronous mode, the program may configure a completion handler on the writer to receive notifications of messages being written to kafka:
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", RequiredAcks: kafka.RequireAll, Async: true, // make the writer asynchronous Completion: func(messages []kafka.Message, err error) { ... }, } ... // Because the writer is asynchronous, there is no need for the context to // be cancelled, the call will never block. if err := w.WriteMessages(context.Background(), msgs...); err != nil { // Only validation errors would be reported in this case. ... }
Methods of Writer are safe to use concurrently from multiple goroutines, however the writer configuration should not be modified after first use.
Example ¶
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "Topic-1", } w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, ) w.Close()
Output:
func NewWriter ¶
func NewWriter(config WriterConfig) *Writer
NewWriter creates and returns a new Writer configured with config.
DEPRECATED: Writer value can be instantiated and configured directly, this function is retained for backward compatibility and will be removed in version 1.0.
func (*Writer) Close ¶
Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.
func (*Writer) Stats ¶
func (w *Writer) Stats() WriterStats
Stats returns a snapshot of the writer stats since the last time the method was called, or since the writer was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka writer and report the metrics to a stats collection system.
func (*Writer) WriteMessages ¶
WriteMessages writes a batch of messages to the kafka topic configured on this writer.
Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.
When sending synchronously and the writer's batch size is configured to be greater than 1, this method blocks until either a full batch can be assembled or the batch timeout is reached. The batch size and timeouts are evaluated per partition, so the choice of Balancer can also influence the flushing behavior. For example, the Hash balancer will require on average N * batch size messages to trigger a flush where N is the number of partitions. The best way to achieve good batching behavior is to share one Writer amongst multiple go routines.
When the method returns an error, it may be of type kafka.WriteError to allow the caller to determine the status of each message.
The context passed as first argument may also be used to asynchronously cancel the operation. Note that in this case there are no guarantees made on whether messages were written to kafka, they might also still be written after this method has already returned, therefore it is important to not modify byte slices of passed messages if WriteMessages returned early due to a canceled context. The program should assume that the whole batch failed and re-write the messages later (which could then cause duplicates).
type WriterConfig ¶
type WriterConfig struct { // The list of brokers used to discover the partitions available on the // kafka cluster. // // This field is required, attempting to create a writer with an empty list // of brokers will panic. Brokers []string // The topic that the writer will produce messages to. // // If provided, this will be used to set the topic for all produced messages. // If not provided, each Message must specify a topic for itself. This must be // mutually exclusive, otherwise the Writer will return an error. Topic string // The dialer used by the writer to establish connections to the kafka // cluster. // // If nil, the default dialer is used instead. Dialer *Dialer // The balancer used to distribute messages across partitions. // // The default is to use a round-robin distribution. Balancer Balancer // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // DEPRECATED: in versions prior to 0.4, the writer used channels internally // to dispatch messages to partitions. This has been replaced by an in-memory // aggregation of batches which uses shared state instead of message passing, // making this option unnecessary. QueueCapacity int // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int // Limit the maximum size of a request in bytes before being sent to // a partition. // // The default is to use a kafka default value of 1048576. BatchBytes int // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache // the topic layout. With the change to use a transport to manage connections, // the responsibility of syncing the cluster layout has been delegated to the // transport. RebalanceInterval time.Duration // DEPRECATED: in versions prior to 0.4, the writer used to manage connections // to the kafka cluster directly. With the change to use a transport to manage // connections, the writer has no connections to manage directly anymore. IdleConnTimeout time.Duration // Number of acknowledges from partition replicas required before receiving // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. RequiredAcks int // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. Async bool // CompressionCodec set the codec to be used to compress Kafka messages. CompressionCodec // If not nil, specifies a logger used to report internal changes within the // writer. Logger Logger // ErrorLogger is the logger used to report errors. If nil, the writer falls // back to using Logger instead. ErrorLogger Logger }
WriterConfig is a configuration type used to create new instances of Writer.
DEPRECATED: writer values should be configured directly by assigning their exported fields. This type is kept for backward compatibility, and will be removed in version 1.0.
func (*WriterConfig) Validate ¶ added in v0.2.3
func (config *WriterConfig) Validate() error
Validate method validates WriterConfig properties.
type WriterStats ¶
type WriterStats struct { Writes int64 `metric:"kafka.writer.write.count" type:"counter"` Messages int64 `metric:"kafka.writer.message.count" type:"counter"` Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"` Errors int64 `metric:"kafka.writer.error.count" type:"counter"` BatchTime DurationStats `metric:"kafka.writer.batch.seconds"` BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"` WriteTime DurationStats `metric:"kafka.writer.write.seconds"` WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` Retries int64 `metric:"kafka.writer.retries.count" type:"counter"` BatchSize SummaryStats `metric:"kafka.writer.batch.size"` BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"` WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"` MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"` BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"` ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"` WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"` RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"` Async bool `metric:"kafka.writer.async" type:"gauge"` Topic string `tag:"topic"` // DEPRECATED: these fields will only be reported for backward compatibility // if the Writer was constructed with NewWriter. Dials int64 `metric:"kafka.writer.dial.count" type:"counter"` DialTime DurationStats `metric:"kafka.writer.dial.seconds"` // DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes // to the internal implementation and the introduction of the transport type // made them unnecessary. // // The values will be zero but are left for backward compatibility to avoid // breaking programs that used these fields. Rebalances int64 RebalanceInterval time.Duration QueueLength int64 QueueCapacity int64 ClientID string }
WriterStats is a data structure returned by a call to Writer.Stats that exposes details about the behavior of the writer.
Source Files ¶
- addoffsetstotxn.go
- addpartitionstotxn.go
- address.go
- alterclientquotas.go
- alterconfigs.go
- alterpartitionreassignments.go
- alteruserscramcredentials.go
- apiversions.go
- balancer.go
- batch.go
- buffer.go
- client.go
- commit.go
- compression.go
- conn.go
- consumergroup.go
- crc32.go
- createacls.go
- createpartitions.go
- createtopics.go
- deleteacls.go
- deletegroups.go
- deletetopics.go
- describeacls.go
- describeclientquotas.go
- describeconfigs.go
- describegroups.go
- describeuserscramcredentials.go
- dialer.go
- discard.go
- electleaders.go
- endtxn.go
- error.go
- fetch.go
- findcoordinator.go
- groupbalancer.go
- heartbeat.go
- incrementalalterconfigs.go
- initproducerid.go
- joingroup.go
- kafka.go
- leavegroup.go
- listgroups.go
- listoffset.go
- listpartitionreassignments.go
- logger.go
- message.go
- message_reader.go
- metadata.go
- offsetcommit.go
- offsetdelete.go
- offsetfetch.go
- produce.go
- protocol.go
- rawproduce.go
- read.go
- reader.go
- record.go
- recordbatch.go
- resolver.go
- resource.go
- saslauthenticate.go
- saslhandshake.go
- sizeof.go
- stats.go
- syncgroup.go
- time.go
- transport.go
- txnoffsetcommit.go
- write.go
- writer.go
Directories ¶
Path | Synopsis |
---|---|
zstd
Package zstd implements Zstandard compression.
|
Package zstd implements Zstandard compression. |
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
|
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included. |
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
|
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included. |
aws_msk_iam
Module
|
|
aws_msk_iam_v2
Module
|
|
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
|
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included. |
Package topics is an experimental package that provides additional tooling around Kafka Topics.
|
Package topics is an experimental package that provides additional tooling around Kafka Topics. |
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
|
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included. |