kadm

package module
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: BSD-3-Clause Imports: 20 Imported by: 70

Documentation

Overview

Package kadm provides a helper Kafka admin client around a *kgo.Client.

This package is meant to cover the common use cases for dropping into an "admin" like interface for Kafka. As with any admin client, this package must make opinionated decisions on what to provide and what to hide. The underlying Kafka protocol gives more detailed information in responses, or allows more fine tuning in requests, but most of the time, these details are unnecessary.

By virtue of making opinionated decisions, this package cannot satisfy every need for requests and responses. If you need more control than this admin client provides, you can use the kmsg package directly.

This package contains a lot of types, but the main two types type to know are Client and ShardErrors. Every other type is used for inputs or outputs to methods on the client.

The Client type is a simple small wrapper around a *kgo.Client that exists solely to namespace methods. The ShardErrors type is a bit more complicated. When issuing requests, under the hood some of these requests actually need to be mapped to brokers and split, issuing different pieces of the input request to different brokers. The *kgo.Client handles this all internally, but (if using RequestSharded as directed), returns each response to each of these split requests individually. Each response can fail or be successful. This package goes one step further and merges these failures into one meta failure, ShardErrors. Any function that returns ShardErrors is documented as such, and if a function returns a non-nil ShardErrors, it is possible that the returned data is actually valid and usable. If you care to, you can log / react to the partial failures and continue using the partial successful result. This is in contrast to other clients, which either require to to request individual brokers directly, or they completely hide individual failures, or they completely fail on any individual failure.

For methods that list or describe things, this package often completely fails responses on auth failures. If you use a method that accepts two topics, one that you are authorized to and one that you are not, you will not receive a partial successful response. Instead, you will receive an AuthError. Methods that do *not* fail on auth errors are explicitly documented as such.

Users may often find it easy to work with lists of topics or partitions. Rather than needing to build deeply nested maps directly, this package has a few helper types that are worth knowing:

TopicsList  - a slice of topics and their partitions
TopicsSet   - a set of topics, each containing a set of partitions
Partitions  - a slice of partitions
OffsetsList - a slice of offsets
Offsets     - a map of offsets

These types are meant to be easy to build and use, and can be used as the starting point for other types.

Many functions in this package are variadic and return either a map or a list of responses, and you may only use one element as input and are only interested in one element of output. This package provides the following functions to help:

Any(map)
AnyE(map, err)
First(slice)
FirstE(slice, err)

The intended use case of these is something like `kadm.AnyE(kadm.CreateTopics(..., "my-one-topic"))`, such that you can immediately get the response for the one topic you are creating.

Index

Constants

View Source
const FetchAllGroupTopics = "|fetch-all-group-topics|"

FetchAllGroupTopics is a kadm "internal" topic name that can be used in [FetchOffsetsForTopics]. By default, [FetchOffsetsForTopics] only returns topics that are explicitly requested. Other topics that may be committed to in the group are not returned. Using FetchAllRequestedTopics switches the behavior to return the union of all committed topics and all requested topics.

Variables

View Source
var ErrEmpty = errors.New("empty")

ErrEmpty is returned from FirstE or AnyE if the input is empty.

Functions

func Any added in v1.7.0

func Any[M ~map[K]V, K comparable, V any](m M) (V, bool)

Any returns the first range element of the input map and whether it exists. This is the non-error-accepting equivalent of AnyE.

Many client methods in kadm accept a variadic amount of input arguments and return either a slice or a map of responses, but you often use the method with only one argument. This function can help extract the one response you are interested in.

func AnyE added in v1.7.0

func AnyE[M ~map[K]V, K comparable, V any](m M, err error) (V, error)

AnyE returns the first range element of the input map, or the input error if it is non-nil. If the error is nil but the map is empty, this returns ErrEmpty. This is the error-accepting equivalent of Any.

Many client methods in kadm accept a variadic amount of input arguments and return either a slice or a map of responses, but you often use the method with only one argument. This function can help extract the one response you are interested in.

func First added in v1.7.0

func First[S ~[]T, T any](s S) (T, bool)

First returns the first element of the input slice and whether it exists. This is the non-error-accepting equivalent of FirstE.

Many client methods in kadm accept a variadic amount of input arguments and return either a slice or a map of responses, but you often use the method with only one argument. This function can help extract the one response you are interested in.

func FirstE added in v1.7.0

func FirstE[S ~[]T, T any](s S, err error) (T, error)

FirstE returns the first element of the input slice, or the input error if it is non-nil. If the error is nil but the slice is empty, this returns ErrEmpty. This is the error-accepting equivalent of First.

Many client methods in kadm accept a variadic amount of input arguments and return either a slice or a map of responses, but you often use the method with only one argument. This function can help extract the one response you are interested in.

func StringPtr

func StringPtr(s string) *string

StringPtr is a shortcut function to aid building configs for creating or altering topics.

Types

type ACLBuilder

type ACLBuilder struct {
	// contains filtered or unexported fields
}

ACLBuilder is a builder that is used for batch creating / listing / deleting ACLS.

An ACL consists of five components:

  • the user (principal)
  • the host the user runs on
  • what resource to access (topic name, group id, etc.)
  • the operation (read, write)
  • whether to allow or deny the above

This builder allows for adding the above five components in batches and then creating, listing, or deleting a batch of ACLs in one go. This builder merges the fifth component (allowing or denying) into allowing principals and hosts and denying principals and hosts. The builder must always have an Allow or Deny. For creating, the host is optional and defaults to the wildcard * that allows or denies all hosts. For listing / deleting, the host is also required (specifying no hosts matches all hosts, but you must specify this).

Building works on a multiplying factor: every user, every host, every resource, and every operation is combined (principals * hosts * resources * operations).

With the Kafka simple authorizer (and most reimplementations), all principals are required to have the "User:" prefix. The PrefixUserExcept function can be used to easily add the "User:" prefix if missing.

The full set of operations and which requests require what operations is described in a large doc comment on the ACLOperation type.

Lastly, resources to access / deny access to can be created / matched based on literal (exact) names, or on prefix names, or more. See the ACLPattern docs for more information.

func NewACLs

func NewACLs() *ACLBuilder

NewACLs returns a new ACL builder.

func (*ACLBuilder) Allow

func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder

Allow sets the principals to add allow permissions for. For listing and deleting, you must also use AllowHosts.

This returns the input pointer.

For creating, if this is not paired with AllowHosts, the user will have access to all hosts (the wildcard *).

For listing & deleting, if the principals are empty, this matches any user.

func (*ACLBuilder) AllowHosts

func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder

AllowHosts sets the hosts to add allow permissions for. If using this, you must also use Allow.

This returns the input pointer.

For creating, if this is empty, the user will have access to all hosts (the wildcard *) and this function is actually not necessary.

For listing & deleting, if the hosts are empty, this matches any host.

func (*ACLBuilder) AnyResource

func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder

AnyResource lists & deletes ACLs of any type matching the given names (pending other filters). If no names are given, this matches all names.

This returns the input pointer.

This function does nothing for creating.

func (*ACLBuilder) Clusters

func (b *ACLBuilder) Clusters() *ACLBuilder

Clusters lists/deletes/creates ACLs of resource type "cluster".

This returns the input pointer.

There is only one type of cluster in Kafka, "kafka-cluster". Opting in to listing or deleting by cluster inherently matches all ACLS of resource type cluster. For creating, this function allows for creating cluster ACLs.

func (*ACLBuilder) DelegationTokens

func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder

DelegationTokens lists/deletes/creates ACLs of resource type "delegation_token" for the given delegation tokens.

This returns the input pointer.

For listing or deleting, if this is provided no tokens, all "delegation_token" resource type ACLs are matched. For creating, if no tokens are provided, this function does nothing.

func (*ACLBuilder) Deny

func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder

Deny sets the principals to add deny permissions for. For listing and deleting, you must also use DenyHosts.

This returns the input pointer.

For creating, if this is not paired with DenyHosts, the user will be denied access to all hosts (the wildcard *).

For listing & deleting, if the principals are empty, this matches any user.

func (*ACLBuilder) DenyHosts

func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder

DenyHosts sets the hosts to add deny permissions for. If using this, you must also use Deny.

This returns the input pointer.

For creating, if this is empty, the user will be denied access to all hosts (the wildcard *) and this function is actually not necessary.

For listing & deleting, if the hosts are empty, this matches any host.

func (*ACLBuilder) Groups

func (b *ACLBuilder) Groups(g ...string) *ACLBuilder

Groups lists/deletes/creates ACLs of resource type "group" for the given groups.

This returns the input pointer.

For listing or deleting, if this is provided no groups, all "group" resource type ACLs are matched. For creating, if no groups are provided, this function does nothing.

func (*ACLBuilder) HasAnyFilter

func (b *ACLBuilder) HasAnyFilter() bool

HasAnyFilter returns whether any field in this builder is opted into "any", meaning a wide glob. This would be if you used Topics with no topics, and so on. This function can be used to detect if you accidentally opted into a non-specific ACL.

The evaluated fields are: resources, principals/hosts, a single OpAny operation, and an Any pattern.

func (*ACLBuilder) HasHosts

func (b *ACLBuilder) HasHosts() bool

HasHosts returns if any allow or deny hosts have been set, or if their "any" field is true.

func (*ACLBuilder) HasPrincipals

func (b *ACLBuilder) HasPrincipals() bool

HasPrincipals returns if any allow or deny principals have been set, or if their "any" field is true.

func (*ACLBuilder) HasResource

func (b *ACLBuilder) HasResource() bool

HasResource returns true if the builder has a non-empty resource (topic, group, ...), or if any resource has "any" set to true.

func (*ACLBuilder) MaybeAllow

func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder

MaybeAllow is the same as Allow, but does not match all allowed principals if none are provided.

func (*ACLBuilder) MaybeAllowHosts

func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder

MaybeAllowHosts is the same as AllowHosts, but does not match all allowed hosts if none are provided.

func (*ACLBuilder) MaybeClusters

func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder

MaybeClusters is the same as Clusters, but only matches clusters if c is true.

func (*ACLBuilder) MaybeDelegationTokens

func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder

MaybeDelegationTokens is the same as DelegationTokens, but does not match all tokens if none are provided.

func (*ACLBuilder) MaybeDeny

func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder

MaybeDeny is the same as Deny, but does not match all denied principals if none are provided.

func (*ACLBuilder) MaybeDenyHosts

func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder

MaybeDenyHosts is the same as DenyHosts, but does not match all denied hosts if none are provided.

func (*ACLBuilder) MaybeGroups

func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder

MaybeGroups is the same as Groups, but does not match all groups if none are provided.

func (*ACLBuilder) MaybeOperations

func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder

MaybeOperations is the same as Operations, but does not match all operations if none are provided.

func (*ACLBuilder) MaybeTopics

func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder

MaybeTopics is the same as Topics, but does not match all topics if none are provided.

func (*ACLBuilder) MaybeTransactionalIDs

func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder

MaybeTransactionalIDs is the same as TransactionalIDs, but does not match all transactional ID's if none are provided.

func (*ACLBuilder) Operations

func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder

Operations sets operations to allow or deny. Passing no operations defaults to OpAny.

This returns the input pointer.

For creating, OpAny returns an error, for it is strictly used for filters (listing & deleting).

func (*ACLBuilder) PrefixUser

func (b *ACLBuilder) PrefixUser()

PrefixUser prefixes all allowed and denied principals with "User:".

func (*ACLBuilder) PrefixUserExcept

func (b *ACLBuilder) PrefixUserExcept(except ...string)

PrefixUserExcept prefixes all allowed and denied principals with "User:", unless they have any of the given except prefixes.

func (*ACLBuilder) ResourcePatternType

func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder

ResourcePatternType sets the pattern type to use when creating or filtering ACL resource names, overriding the default of LITERAL.

This returns the input pointer.

For creating, only LITERAL and PREFIXED are supported.

func (*ACLBuilder) Topics

func (b *ACLBuilder) Topics(t ...string) *ACLBuilder

Topics lists/deletes/creates ACLs of resource type "topic" for the given topics.

This returns the input pointer.

For listing or deleting, if this is provided no topics, all "topic" resource type ACLs are matched. For creating, if no topics are provided, this function does nothing.

func (*ACLBuilder) TransactionalIDs

func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder

TransactionalIDs lists/deletes/creates ACLs of resource type "transactional_id" for the given transactional IDs.

This returns the input pointer.

For listing or deleting, if this is provided no IDs, all "transactional_id" resource type ACLs matched. For creating, if no IDs are provided, this function does nothing.

func (*ACLBuilder) ValidateCreate

func (b *ACLBuilder) ValidateCreate() error

ValidateCreate returns an error if the builder is invalid for creating ACLs.

func (*ACLBuilder) ValidateDelete

func (b *ACLBuilder) ValidateDelete() error

ValidateDelete is an alias for ValidateFilter.

func (*ACLBuilder) ValidateDescribe

func (b *ACLBuilder) ValidateDescribe() error

ValidateDescribe is an alias for ValidateFilter.

func (*ACLBuilder) ValidateFilter

func (b *ACLBuilder) ValidateFilter() error

ValidateFilter returns an error if the builder is invalid for deleting or describing ACLs (which both operate on a filter basis).

type ACLOperation

type ACLOperation = kmsg.ACLOperation

ACLOperation is a type alias for kmsg.ACLOperation, which is an enum containing all Kafka ACL operations and has helper functions.

Kafka requests require the following operations (broker <=> broker ACLs elided):

PRODUCING/CONSUMING
===================
Produce      WRITE on TOPIC for topics
             WRITE on TRANSACTIONAL_ID for txn id (if transactionally producing)

Fetch        READ on TOPIC for topics

ListOffsets  DESCRIBE on TOPIC for topics

Metadata     DESCRIBE on TOPIC for topics
             CREATE on CLUSTER for kafka-cluster (if automatically creating new topics)
             CREATE on TOPIC for topics (if automatically creating new topics)

OffsetForLeaderEpoch  DESCRIBE on TOPIC for topics

GROUPS
======
FindCoordinator  DESCRIBE on GROUP for group (if finding group coordinator)
                 DESCRIBE on TRANSACTIONAL_ID for id (if finding transactiona coordinator)

OffsetCommit     READ on GROUP for group
                 READ on TOPIC for topics

OffsetFetch      DESCRIBE on GROUP for group
                 DESCRIBE on TOPIC for topics

OffsetDelete     DELETE on GROUP For group
                 READ on TOPIC for topics

JoinGroup        READ on GROUP for group
Heartbeat        READ on GROUP for group
LeaveGroup       READ on GROUP for group
SyncGroup        READ on GROUP for group

DescribeGroup    DESCRIBE on GROUP for groups

ListGroups       DESCRIBE on GROUP for groups
                 or, DESCRIBE on CLUSTER for kafka-cluster

DeleteGroups     DELETE on GROUP for groups

TRANSACTIONS (including FindCoordinator above)
============
InitProducerID      WRITE on TRANSACTIONAL_ID for id, if using transactions
                    or, IDEMPOTENT_WRITE on CLUSTER for kafka-cluster, if pre Kafka 3.0
                    or, WRITE on TOPIC for any topic, if Kafka 3.0+

AddPartitionsToTxn  WRITE on TRANSACTIONAL_ID for id
                    WRITE on TOPIC for topics

AddOffsetsToTxn     WRITE on TRANSACTIONAL_ID for id
                    READ on GROUP for group

EndTxn              WRITE on TRANSACTIONAL_ID for id

TxnOffsetCommit     WRITE on TRANSACTIONAL_ID for id
                    READ on GROUP for group
                    READ on TOPIC for topics

TOPIC ADMIN
===========
CreateTopics      CREATE on CLUSTER for kafka-cluster
                  CREATE on TOPIC for topics
                  DESCRIBE_CONFIGS on TOPIC for topics, for returning topic configs on create

CreatePartitions  ALTER on TOPIC for topics

DeleteTopics      DELETE on TOPIC for topics
                  DESCRIBE on TOPIC for topics, if deleting by topic id (in addition to prior ACL)

DeleteRecords     DELETE on TOPIC for topics

CONFIG ADMIN
============
DescribeConfigs          DESCRIBE_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger describing
                         DESCRIBE_CONFIGS on TOPIC for topics, for topic describing

AlterConfigs             ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker altering
                         ALTER_CONFIGS on TOPIC for topics, for topic altering

IncrementalAlterConfigs  ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger altering
                         ALTER_CONFIGS on TOPIC for topics, for topic altering

MISC ADMIN
==========
AlterReplicaLogDirs  ALTER on CLUSTER for kafka-cluster
DescribeLogDirs      DESCRIBE on CLUSTER for kafka-cluster

AlterPartitionAssignments   ALTER on CLUSTER for kafka-cluster
ListPartitionReassignments  DESCRIBE on CLUSTER for kafka-cluster

DescribeDelegationTokens    DESCRIBE on DELEGATION_TOKEN for id

ElectLeaders          ALTER on CLUSTER for kafka-cluster

DescribeClientQuotas  DESCRIBE_CONFIGS on CLUSTER for kafka-cluster
AlterClientQuotas     ALTER_CONFIGS on CLUSTER for kafka-cluster

DescribeUserScramCredentials  DESCRIBE on CLUSTER for kafka-cluster
AlterUserScramCredentials     ALTER on CLUSTER for kafka-cluster

UpdateFeatures        ALTER on CLUSTER for kafka-cluster

DescribeCluster       DESCRIBE on CLUSTER for kafka-cluster

DescribeProducerIDs   READ on TOPIC for topics
DescribeTransactions  DESCRIBE on TRANSACTIONAL_ID for ids
                      DESCRIBE on TOPIC for topics
ListTransactions      DESCRIBE on TRANSACTIONAL_ID for ids
const (
	// OpUnknown is returned for unknown operations.
	OpUnknown ACLOperation = kmsg.ACLOperationUnknown

	// OpAny, used for listing and deleting, matches any operation.
	OpAny ACLOperation = kmsg.ACLOperationAny

	// OpAll is a shortcut for allowing / denying all operations.
	OpAll ACLOperation = kmsg.ACLOperationAll

	// OpRead is the READ operation.
	OpRead ACLOperation = kmsg.ACLOperationRead

	// OpWrite is the WRITE operation.
	OpWrite ACLOperation = kmsg.ACLOperationWrite

	// OpCreate is the CREATE operation.
	OpCreate ACLOperation = kmsg.ACLOperationCreate

	// OpDelete is the DELETE operation.
	OpDelete ACLOperation = kmsg.ACLOperationDelete

	// OpAlter is the ALTER operation.
	OpAlter ACLOperation = kmsg.ACLOperationAlter

	// OpDescribe is the DESCRIBE operation.
	OpDescribe ACLOperation = kmsg.ACLOperationDescribe

	// OpClusterAction is the CLUSTER_ACTION operation. This operation is
	// used for any broker<=>broker communication and is not needed by
	// clients.
	OpClusterAction ACLOperation = kmsg.ACLOperationClusterAction

	// OpDescribeConfigs is the DESCRIBE_CONFIGS operation.
	OpDescribeConfigs ACLOperation = kmsg.ACLOperationDescribeConfigs

	// OpAlterConfigs is the ALTER_CONFIGS operation.
	OpAlterConfigs ACLOperation = kmsg.ACLOperationAlterConfigs

	// OpIdempotentWrite is the IDEMPOTENT_WRITE operation. As of Kafka
	// 3.0+, this has been deprecated and replaced by the ability to WRITE
	// on any topic.
	OpIdempotentWrite ACLOperation = kmsg.ACLOperationIdempotentWrite
)

type ACLPattern

type ACLPattern = kmsg.ACLResourcePatternType

ACLPattern is a type alias for kmsg.ACLResourcePatternType, which is an enum containing all Kafka ACL resource pattern options.

Creating/listing/deleting ACLs works on a resource name basis: every ACL created has a name, and every ACL filtered for listing / deleting matches by name. The name by default is "literal", meaning created ACLs will have the exact name, and matched ACLs must match completely.

Prefixed names allow for creating an ACL that matches any prefix: principals foo-bar and foo-baz both have the prefix "foo-", meaning a READ on TOPIC for User:foo- with prefix pattern will allow both of those principals to read the topic.

Any and match are used for listing and deleting. Any will match any name, be it literal or prefix or a wildcard name. There is no need for specifying topics, groups, etc. when using any resource pattern.

Alternatively, match requires a name, but it matches any literal name (exact match), any prefix, and any wildcard.

const (
	// ACLPatternUnknown is returned for unknown patterns.
	ACLPatternUnknown ACLPattern = kmsg.ACLResourcePatternTypeUnknown

	// ACLPatternAny is the ANY resource pattern.
	ACLPatternAny ACLPattern = kmsg.ACLResourcePatternTypeAny

	// ACLPatternMatch is the MATCH resource pattern.
	ACLPatternMatch ACLPattern = kmsg.ACLResourcePatternTypeMatch

	// ACLPatternLiteral is the LITERAL resource pattern, the default.
	ACLPatternLiteral ACLPattern = kmsg.ACLResourcePatternTypeLiteral

	// ACLPatternPrefixed is the PREFIXED resource pattern.
	ACLPatternPrefixed ACLPattern = kmsg.ACLResourcePatternTypePrefixed
)

type AlterAllReplicaLogDirsResponses

type AlterAllReplicaLogDirsResponses map[int32]AlterReplicaLogDirsResponses

AlterAllReplicaLogDirsResponses contains per-broker responses to altered partition directories.

func (AlterAllReplicaLogDirsResponses) Each

Each calls fn for every response.

func (AlterAllReplicaLogDirsResponses) Sorted

Sorted returns the responses sorted by broker, topic, and partition.

type AlterClientQuotaEntry added in v1.5.0

type AlterClientQuotaEntry struct {
	Entity ClientQuotaEntity    // Entity is the entity to alter quotas for.
	Ops    []AlterClientQuotaOp // Ops are quotas to set or remove.
}

AlterClientQuotaEntry pairs an entity with quotas to set or remove.

type AlterClientQuotaOp added in v1.5.0

type AlterClientQuotaOp struct {
	Key    string  // Key is the quota configuration key to set or remove.
	Value  float64 // Value is the quota configuration value to set or remove.
	Remove bool    // Remove, if true, removes this quota rather than sets it.
}

AlterClientQuotaOp sets or remove a client quota.

type AlterConfig

type AlterConfig struct {
	Op    IncrementalOp // Op is the incremental alter operation to perform. This is ignored for State alter functions.
	Name  string        // Name is the name of the config to alter.
	Value *string       // Value is the value to use when altering, if any.
}

AlterConfig is an individual key/value operation to perform when altering configs.

This package includes a StringPtr function to aid in building config values.

type AlterConfigsResponse

type AlterConfigsResponse struct {
	Name string // Name is the name of this resource (topic name or broker number).
	Err  error  // Err is non-nil if the config could not be altered.
}

AlteredConfigsResponse contains the response for an individual alteration.

type AlterConfigsResponses

type AlterConfigsResponses []AlterConfigsResponse

AlterConfigsResponses contains responses for many alterations.

func (AlterConfigsResponses) On

On calls fn for the response name if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the resource does not exist, this returns kerr.UnknownTopicOrPartition.

type AlterPartitionAssignmentsReq added in v1.1.0

type AlterPartitionAssignmentsReq map[string]map[int32][]int32

AlterPartitionAssignmentsReq is the input for a request to alter partition assignments. The keys are topics and partitions, and the final slice corresponds to brokers that replicas will be assigneed to. If the brokers for a given partition are null, the request will *cancel* any active reassignment for that partition.

func (*AlterPartitionAssignmentsReq) Assign added in v1.1.0

func (r *AlterPartitionAssignmentsReq) Assign(t string, p int32, brokers []int32)

Assign specifies brokers that a partition should be placed on. Using null for the brokers cancels a pending reassignment of the parititon.

func (*AlterPartitionAssignmentsReq) CancelAssign added in v1.1.0

func (r *AlterPartitionAssignmentsReq) CancelAssign(t string, p int32)

CancelAssign cancels a reassignment of the given partition.

type AlterPartitionAssignmentsResponse added in v1.1.0

type AlterPartitionAssignmentsResponse struct {
	Topic      string // Topic is the topic that was assigned.
	Partition  int32  // Partition is the partition that was assigned.
	Err        error  // Err is non-nil if this assignment errored.
	ErrMessage string // ErrMessage is an optional additional message on error.
}

AlterPartitionAssignmentsResponse contains a response for an individual partition that was assigned.

type AlterPartitionAssignmentsResponses added in v1.1.0

type AlterPartitionAssignmentsResponses map[string]map[int32]AlterPartitionAssignmentsResponse

AlterPartitionAssignmentsResponses contains responses to all partitions in an alter assignment request.

func (AlterPartitionAssignmentsResponses) Each added in v1.1.0

Each calls fn for every response.

func (AlterPartitionAssignmentsResponses) Error added in v1.11.0

Error returns the first error in the responses, if any.

func (AlterPartitionAssignmentsResponses) Sorted added in v1.1.0

Sorted returns the responses sorted by topic and partition.

type AlterReplicaLogDirsReq

type AlterReplicaLogDirsReq map[string]TopicsSet

AlterReplicaLogDirsReq is the input for a request to alter replica log directories. The key is the directory that all topics and partitions in the topic set will move to.

func (*AlterReplicaLogDirsReq) Add

Add merges the input topic set into the given directory.

type AlterReplicaLogDirsResponse

type AlterReplicaLogDirsResponse struct {
	Broker    int32  // Broker is the broker this response came from.
	Dir       string // Dir is the directory this partition was requested to be moved to.
	Topic     string // Topic is the topic for this partition.
	Partition int32  // Partition is the partition that was moved.
	Err       error  // Err is non-nil if this move had an error.
}

AlterReplicaLogDirsResponse contains a the response for an individual altered partition directory.

func (AlterReplicaLogDirsResponse) Less

Less returns if the response is less than the other by broker, dir, topic, and partition.

type AlterReplicaLogDirsResponses

type AlterReplicaLogDirsResponses map[string]map[int32]AlterReplicaLogDirsResponse

AlterReplicaLogDirsResponses contains responses to altered partition directories for a single broker.

func (AlterReplicaLogDirsResponses) Each

Each calls fn for every response.

func (AlterReplicaLogDirsResponses) Sorted

Sorted returns the responses sorted by topic and partition.

type AlteredClientQuota added in v1.5.0

type AlteredClientQuota struct {
	Entity     ClientQuotaEntity // Entity is the entity this result is for.
	Err        error             // Err is non-nil if the alter operation on this entity failed.
	ErrMessage string            // ErrMessage is an optional additional message on error.
}

AlteredClientQuota is the result for a single entity that was altered.

type AlteredClientQuotas added in v1.5.0

type AlteredClientQuotas []AlteredClientQuota

AlteredClientQuotas contains results for all altered entities.

type AlteredUserSCRAM added in v1.5.0

type AlteredUserSCRAM struct {
	User       string // User is the username that was altered.
	Err        error  // Err is any error encountered when altering the user.
	ErrMessage string // ErrMessage a potential extra message describing any error.
}

AlteredUserSCRAM is the result of an alter operation.

type AlteredUserSCRAMs added in v1.5.0

type AlteredUserSCRAMs map[string]AlteredUserSCRAM

AlteredUserSCRAMs contains altered user SCRAM credentials keyed by user.

func (AlteredUserSCRAMs) AllFailed added in v1.5.0

func (as AlteredUserSCRAMs) AllFailed() bool

AllFailed returns whether all altered user credentials are errored.

func (AlteredUserSCRAMs) Each added in v1.5.0

func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM))

Each calls fn for every altered user.

func (AlteredUserSCRAMs) EachError added in v1.5.0

func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM))

EachError calls fn for every altered user that has a non-nil error.

func (AlteredUserSCRAMs) Error added in v1.5.0

func (as AlteredUserSCRAMs) Error() error

Error iterates over all altered users and returns the first error encountered, if any.

func (AlteredUserSCRAMs) Ok added in v1.5.0

func (as AlteredUserSCRAMs) Ok() bool

Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.

func (AlteredUserSCRAMs) Sorted added in v1.5.0

func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM

Sorted returns the altered user credentials ordered by user.

type AuthError

type AuthError struct {
	Err error // Err is the inner *kerr.Error authorization error.
}

AuthError can be returned from requests for resources that you are not authorized for.

func (*AuthError) Error

func (a *AuthError) Error() string

func (*AuthError) Is

func (a *AuthError) Is(err error) bool

func (*AuthError) Unwrap

func (a *AuthError) Unwrap() error

type BrokerApiVersions added in v1.4.0

type BrokerApiVersions struct {
	NodeID int32 // NodeID is the node this API versions response is for.

	Err error // Err is non-nil if the API versions request failed.
	// contains filtered or unexported fields
}

BrokerApiVersions contains the API versions for a single broker.

func (*BrokerApiVersions) EachKeySorted added in v1.4.0

func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16))

EachKeySorted calls fn for every API key in the broker response, from the smallest API key to the largest.

func (*BrokerApiVersions) KeyMaxVersion added in v1.4.0

func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool)

KeyVersions returns the broker's max version for an API key and whether this broker supports the request.

func (*BrokerApiVersions) KeyMinVersion added in v1.4.0

func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool)

KeyVersions returns the broker's min version for an API key and whether this broker supports the request.

func (*BrokerApiVersions) KeyVersions added in v1.4.0

func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool)

KeyVersions returns the broker's min and max version for an API key and whether this broker supports the request.

func (*BrokerApiVersions) Raw added in v1.4.0

Raw returns the raw API versions response.

func (*BrokerApiVersions) VersionGuess added in v1.4.0

func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string

VersionGuess returns the best guess of Kafka that this broker is. This is a shorcut for:

kversion.FromApiVersionsResponse(v.Raw()).VersionGuess(opt...)

Check the kversion.VersionGuess API docs for more details.

type BrokerDetail

type BrokerDetail = kgo.BrokerMetadata

BrokerDetail is a type alias for kgo.BrokerMetadata.

type BrokerDetails

type BrokerDetails []BrokerDetail

BrokerDetails contains the details for many brokers.

func (BrokerDetails) NodeIDs

func (ds BrokerDetails) NodeIDs() []int32

NodeIDs returns the IDs of all nodes.

type BrokersApiVersions added in v1.4.0

type BrokersApiVersions map[int32]BrokerApiVersions

BrokerApiVersions contains API versions for all brokers that are reachable from a metadata response.

func (BrokersApiVersions) Each added in v1.4.0

func (vs BrokersApiVersions) Each(fn func(BrokerApiVersions))

Each calls fn for every broker response.

func (BrokersApiVersions) Sorted added in v1.4.0

func (vs BrokersApiVersions) Sorted() []BrokerApiVersions

Sorted returns all broker responses sorted by node ID.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an admin client.

This is a simple wrapper around a *kgo.Client to provide helper admin methods.

func NewClient

func NewClient(cl *kgo.Client) *Client

NewClient returns an admin client.

func NewOptClient

func NewOptClient(opts ...kgo.Opt) (*Client, error)

NewOptClient returns a new client directly from kgo options. This is a wrapper around creating a new *kgo.Client and then creating an admin client.

func (*Client) AlterAllReplicaLogDirs

func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error)

AlterAllReplicaLogDirs alters the log directories for the input topic partitions, moving each partition to the requested directory. This function moves all replicas on any broker.

This may return *ShardErrors.

func (*Client) AlterBrokerConfigs

func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)

AlterBrokerConfigs incrementally alters broker configuration values. If brokers are specified, this updates each specific broker. If no brokers are specified, this updates whole-cluster broker configuration values.

This method requires talking to a cluster that supports IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many broker reimplementations support this request even if they do not support all other requests from Kafka v2.3).

If you want to alter the entire configs state using the older AlterConfigs request, use AlterBrokerConfigsState.

This may return *ShardErrors. You may consider checking ValidateAlterBrokerConfigs before using this method.

func (*Client) AlterBrokerConfigsState added in v1.8.0

func (cl *Client) AlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)

AlterBrokerConfigs alters the full state of broker configurations. If broker are specified, this updates each specific broker. If no brokers are specified, this updates whole-cluster broker configuration values. All prior configuration is lost.

This may return *ShardErrors. You may consider checking ValidateAlterBrokerConfigs before using this method.

func (*Client) AlterBrokerReplicaLogDirs

func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error)

AlterBrokerReplicaLogDirs alters the log directories for the input topic on the given broker, moving each partition to the requested directory.

func (*Client) AlterClientQuotas added in v1.5.0

func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)

AlterClientQuotas alters quotas for the input entries. You may consider checking ValidateAlterClientQuotas before using this method.

func (*Client) AlterPartitionAssignments added in v1.1.0

AlterPartitionAssignments alters partition assignments for the requested partitions, returning an error if the response could not be issued or if you do not have permissions.

func (*Client) AlterTopicConfigs

func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)

AlterTopicConfigs incrementally alters topic configuration values.

This method requires talking to a cluster that supports IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many broker reimplementations support this request even if they do not support all other requests from Kafka v2.3).

If you want to alter the entire configs state using the older AlterConfigs request, use AlterTopicConfigsState.

This may return *ShardErrors. You may consider checking ValidateAlterTopicConfigs before using this method.

func (*Client) AlterTopicConfigsState added in v1.8.0

func (cl *Client) AlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)

AlterTopicConfigsState alters the full state of topic configurations. All prior configuration is lost.

This may return *ShardErrors. You may consider checking ValidateAlterTopicConfigs before using this method.

func (*Client) AlterUserSCRAMs added in v1.5.0

func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error)

AlterUserSCRAMs deletes, updates, or creates (inserts) user SCRAM credentials. Note that a username can only appear once across both upserts and deletes. This modifies elements of the upsert slice that need to have a salted password generated.

func (*Client) ApiVersions added in v1.4.0

func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error)

ApiVersions queries every broker in a metadata response for their API versions. This returns an error only if the metadata request fails.

func (*Client) BrokerMetadata

func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error)

BrokerMetadata issues a metadata request and returns it, and does not ask for any topics.

This returns an error if the request fails to be issued, or an *AuthErr.

func (*Client) Close

func (cl *Client) Close()

Close closes the underlying *kgo.Client.

func (*Client) CommitAllOffsets

func (cl *Client) CommitAllOffsets(ctx context.Context, group string, os Offsets) error

CommitAllOffsets is identical to CommitOffsets, but returns an error if the offset commit was successful, but some offset within the commit failed to be committed.

This is a shortcut function provided to avoid checking two errors, but you must be careful with this if partially successful commits can be a problem for you.

func (*Client) CommitOffsets

func (cl *Client) CommitOffsets(ctx context.Context, group string, os Offsets) (OffsetResponses, error)

CommitOffsets issues an offset commit request for the input offsets.

This function can be used to manually commit offsets when directly consuming partitions outside of an actual consumer group. For example, if you assign partitions manually, but want still use Kafka to checkpoint what you have consumed, you can manually issue an offset commit request with this method.

This does not return on authorization failures, instead, authorization failures are included in the responses.

func (*Client) CreateACLs

func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error)

CreateACLs creates a batch of ACLs using the ACL builder, validating the input before issuing the CreateACLs request.

If the input is invalid, or if the response fails, or if the response does not contain as many ACLs as we issued in our create request, this returns an error.

func (*Client) CreateDelegationToken added in v1.5.0

func (cl *Client) CreateDelegationToken(ctx context.Context, d CreateDelegationToken) (DelegationToken, error)

CreateDelegationToken creates a delegation token, which is a scoped SCRAM-SHA-256 username and password.

Creating delegation tokens allows for an (ideally) quicker and easier method of enabling authorization for a wide array of clients. Rather than having to manage many passwords external to Kafka, you only need to manage a few accounts and use those to create delegation tokens per client.

Note that delegation tokens inherit the same ACLs as the user creating the token. Thus, if you want to properly scope ACLs, you should not create delegation tokens with admin accounts.

This can return *AuthError.

func (*Client) CreatePartitions

func (cl *Client) CreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)

CreatePartitions issues a create partitions request for the given topics, adding "add" partitions to each topic. This request lets Kafka choose where the new partitions should be.

This does not return an error on authorization failures for the create partitions request itself, instead, authorization failures are included in the responses. Before adding partitions, this request must issue a metadata request to learn the current count of partitions. If that fails, this returns the metadata request error. If you already know the final amount of partitions you want, you can use UpdatePartitions to set the count directly (rather than adding to the current count). You may consider checking ValidateCreatePartitions before using this method.

func (*Client) CreateTopic added in v1.7.0

func (cl *Client) CreateTopic(
	ctx context.Context,
	partitions int32,
	replicationFactor int16,
	configs map[string]*string,
	topic string,
) (CreateTopicResponse, error)

CreateTopic issues a create topics request with the given partitions, replication factor, and (optional) configs for the given topic name. This is similar to CreateTopics, but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response is successful.

func (*Client) CreateTopics

func (cl *Client) CreateTopics(
	ctx context.Context,
	partitions int32,
	replicationFactor int16,
	configs map[string]*string,
	topics ...string,
) (CreateTopicResponses, error)

CreateTopics issues a create topics request with the given partitions, replication factor, and (optional) configs for every topic. Under the hood, this uses the default 15s request timeout and lets Kafka choose where to place partitions.

Version 4 of the underlying create topic request was introduced in Kafka 2.4 and brought client support for creation defaults. If talking to a 2.4+ cluster, you can use -1 for partitions and replicationFactor to use broker defaults.

This package includes a StringPtr function to aid in building config values.

This does not return an error on authorization failures, instead, authorization failures are included in the responses. This only returns an error if the request fails to be issued. You may consider checking ValidateCreateTopics before using this method.

func (*Client) DeleteACLs

func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error)

DeleteACLs deletes a batch of ACLs using the ACL builder, validating the input before issuing the DeleteACLs request.

If the input is invalid, or if the response fails, or if the response does not contain as many ACL results as we issued in our delete request, this returns an error.

Deleting ACLs works on a filter basis: a single filter can match many ACLs. For example, deleting with operation ANY matches any operation. For safety / verification purposes, you an DescribeACLs with the same builder first to see what would be deleted.

func (*Client) DeleteGroup added in v1.11.0

func (cl *Client) DeleteGroup(ctx context.Context, group string) (DeleteGroupResponse, error)

DeleteGroup deletes the specified group. This is similar to DeleteGroups, but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response is successful.

func (*Client) DeleteGroups

func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGroupResponses, error)

DeleteGroups deletes all groups specified.

The purpose of this request is to allow operators a way to delete groups after Kafka 1.1, which removed RetentionTimeMillis from offset commits. See KIP-229 for more details.

This may return *ShardErrors. This does not return on authorization failures, instead, authorization failures are included in the responses.

func (*Client) DeleteOffsets

func (cl *Client) DeleteOffsets(ctx context.Context, group string, s TopicsSet) (DeleteOffsetsResponses, error)

DeleteOffsets deletes offsets for the given group.

Originally, offset commits were persisted in Kafka for some retention time. This posed problematic for infrequently committing consumers, so the retention time concept was removed in Kafka v2.1 in favor of deleting offsets for a group only when the group became empty. However, if a group stops consuming from a topic, then the offsets will persist and lag monitoring for the group will notice an ever increasing amount of lag for these no-longer-consumed topics. Thus, Kafka v2.4 introduced an OffsetDelete request to allow admins to manually delete offsets for no longer consumed topics.

This method requires talking to Kafka v2.4+. This returns an *AuthErr if the user is not authorized to delete offsets in the group at all. This does not return on per-topic authorization failures, instead, per-topic authorization failures are included in the responses.

func (*Client) DeleteRecords

func (cl *Client) DeleteRecords(ctx context.Context, os Offsets) (DeleteRecordsResponses, error)

DeleteRecords issues a delete records request for the given offsets. Per offset, only the Offset field needs to be set.

To delete records, Kafka sets the LogStartOffset for partitions to the requested offset. All segments whose max partition is before the requested offset are deleted, and any records within the segment before the requested offset can no longer be read.

This does not return an error on authorization failures, instead, authorization failures are included in the responses.

This may return *ShardErrors.

func (*Client) DeleteTopic added in v1.11.0

func (cl *Client) DeleteTopic(ctx context.Context, topic string) (DeleteTopicResponse, error)

DeleteTopic issues a delete topic request for the given topic name with a (by default) 15s timeout. This is similar to DeleteTopics, but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response is successful.

func (*Client) DeleteTopics

func (cl *Client) DeleteTopics(ctx context.Context, topics ...string) (DeleteTopicResponses, error)

DeleteTopics issues a delete topics request for the given topic names with a (by default) 15s timeout.

This does not return an error on authorization failures, instead, authorization failures are included in the responses. This only returns an error if the request fails to be issued.

func (*Client) DescribeACLs

func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error)

DescribeACLs describes a batch of ACLs using the ACL builder, validating the input before issuing DescribeACLs requests.

If the input is invalid, or if any response fails, this returns an error.

Listing ACLs works on a filter basis: a single filter can match many ACLs. For example, describing with operation ANY matches any operation. Under the hood, this method issues one describe request per filter, because describing ACLs does not work on a batch basis (unlike creating & deleting). The return of this function can be used to see what would be deleted given the same builder input.

func (*Client) DescribeAllLogDirs

func (cl *Client) DescribeAllLogDirs(ctx context.Context, s TopicsSet) (DescribedAllLogDirs, error)

DescribeAllLogDirs describes the log directores for every input topic partition on every broker. If the input set is nil, this describes all log directories.

This may return *ShardErrors.

func (*Client) DescribeBrokerConfigs

func (cl *Client) DescribeBrokerConfigs(
	ctx context.Context,
	brokers ...int32,
) (ResourceConfigs, error)

DescribeBrokerConfigs returns configuration for the requested brokers. If no brokers are requested, a single request is issued and any broker in the cluster replies with the cluster-level dynamic config values.

This may return *ShardErrors.

func (*Client) DescribeBrokerLogDirs

func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error)

DescribeBrokerLogDirs describes the log directories for the input topic partitions on the given broker. If the input set is nil, this describes all log directories.

func (*Client) DescribeClientQuotas added in v1.5.0

func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityComponents []DescribeClientQuotaComponent) (DescribedClientQuotas, error)

DescribeClientQuotas describes client quotas. If strict is true, the response includes only the requested components.

func (*Client) DescribeDelegationTokens added in v1.5.0

func (cl *Client) DescribeDelegationTokens(ctx context.Context, owners ...Principal) (DelegationTokens, error)

DescribeDelegationTokens describes delegation tokens. This returns either all delegation tokens, or returns only tokens with owners in the requested owners list.

This can return *AuthError.

func (*Client) DescribeGroups

func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (DescribedGroups, error)

DescribeGroups describes either all groups specified, or all groups in the cluster if none are specified.

This may return *ShardErrors or *AuthError.

If no groups are specified and this method first lists groups, and list groups returns a *ShardErrors, this function describes all successfully listed groups and appends the list shard errors to any describe shard errors.

If only one group is described, there will be at most one request issued, and there is no need to deeply inspect the error.

func (*Client) DescribeProducers added in v1.5.0

func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error)

DescribeProducers describes all producers that are transactionally producing to the requested topic set. This request can be used to detect hanging transactions or other transaction related problems. If the input set is empty, this requests data for all partitions.

This may return *ShardErrors or *AuthError.

func (*Client) DescribeTopicConfigs

func (cl *Client) DescribeTopicConfigs(
	ctx context.Context,
	topics ...string,
) (ResourceConfigs, error)

DescribeTopicConfigs returns the configuration for the requested topics.

This may return *ShardErrors.

func (*Client) DescribeTransactions added in v1.5.0

func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error)

DescribeTransactions describes either all transactional IDs specified, or all transactional IDs in the cluster if none are specified.

This may return *ShardErrors or *AuthError.

If no transactional IDs are specified and this method first lists transactional IDs, and listing IDs returns a *ShardErrors, this function describes all successfully listed IDs and appends the list shard errors to any describe shard errors.

If only one ID is described, there will be at most one request issued and there is no need to deeply inspect the error.

func (*Client) DescribeUserSCRAMs added in v1.5.0

func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error)

DescribeUserSCRAMs returns a small bit of information about all users in the input request that have SCRAM passwords configured. No users requests all users.

func (*Client) ElectLeaders added in v1.5.0

func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error)

ElectLeaders elects leaders for partitions. This request was added in Kafka 2.2 to replace the previously-ZooKeeper-only option of triggering leader elections. See KIP-183 for more details.

Kafka 2.4 introduced the ability to use unclean leader election. If you use unclean leader election on a Kafka 2.2 or 2.3 cluster, the client will instead fall back to preferred replica (clean) leader election. You can check the result's How function (or field) to see.

If s is nil, this will elect leaders for all partitions.

This will return *AuthError if you do not have ALTER on CLUSTER for kafka-cluster.

func (*Client) ExpireDelegationToken added in v1.5.0

func (cl *Client) ExpireDelegationToken(ctx context.Context, hmac []byte, expiry time.Duration) (expiryTimestamp time.Time, err error)

ExpireDelegationToken changes a delegation token's expiry timestamp and returns the new expiry timestamp, which is min(now+expiry, maxTimestamp). This request can be used to force tokens to expire quickly, or to give tokens a grace period before expiry. Using an expiry of -1 expires the token immediately.

This can return *AuthError.

func (*Client) FetchManyOffsets

func (cl *Client) FetchManyOffsets(ctx context.Context, groups ...string) FetchOffsetsResponses

FetchManyOffsets issues a fetch offsets requests for each group specified.

This function is a batch version of FetchOffsets. FetchOffsets and CommitOffsets are important to provide as simple APIs for users that manage group offsets outside of a consumer group. Each individual group may have an auth error.

func (*Client) FetchOffsets

func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetResponses, error)

FetchOffsets issues an offset fetch requests for all topics and partitions in the group. Because Kafka returns only partitions you are authorized to fetch, this only returns an auth error if you are not authorized to describe the group at all.

This method requires talking to Kafka v0.11+.

func (*Client) FetchOffsetsForTopics

func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error)

FetchOffsetsForTopics is a helper function that returns the currently committed offsets for the given group, as well as default -1 offsets for any topic/partition that does not yet have a commit.

If any partition fetched or listed has an error, this function returns an error. The returned offset responses are ready to be used or converted directly to pure offsets with `Into`, and again into kgo offsets with another `Into`.

By default, this function returns offsets for only the requested topics. You can use the special "topic" FetchAllGroupTopics to return all committed-to topics in addition to all requested topics.

func (*Client) FindGroupCoordinators added in v1.4.0

func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses

FindGroupCoordinators returns the coordinator for all requested group names.

This may return *ShardErrors or *AuthError.

func (*Client) FindTxnCoordinators added in v1.4.0

func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses

FindTxnCoordinators returns the coordinator for all requested transactional IDs.

This may return *ShardErrors or *AuthError.

func (*Client) Lag added in v1.9.0

func (cl *Client) Lag(ctx context.Context, groups ...string) (DescribedGroupLags, error)

Lag returns the lag for all input groups. This function is a shortcut for the steps required to use CalculateGroupLag properly, with some opinionated choices for error handling since calculating lag is multi-request process. If a group cannot be described or the offsets cannot be fetched, an error is returned for the group. If any topic cannot have its end offsets listed, the lag for the partition has a corresponding error. If any request fails with an auth error, this returns *AuthError.

func (*Client) LeaveGroup added in v1.3.0

func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error)

LeaveGroup causes instance IDs to leave a group.

This function allows manually removing members using instance IDs from a group, which allows for fast scale down / host replacement (see KIP-345 for more detail). This returns an *AuthErr if the use is not authorized to remove members from groups.

func (*Client) ListBrokers

func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error)

ListBrokers issues a metadata request and returns BrokerDetails. This returns an error if the request fails to be issued, or an *AuthError.

func (*Client) ListCommittedOffsets

func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

ListCommittedOffsets returns newest committed offsets for each partition in each requested topic. A committed offset may be slightly less than the latest offset. In Kafka terms, committed means the last stable offset, and newest means the high watermark. Record offsets in active, uncommitted transactions will not be returned. If no topics are specified, all topics are listed. If a requested topic does not exist, no offsets for it are listed and it is not present in the response.

If any topics being listed do not exist, a special -1 partition is added to the response with the expected error code kerr.UnknownTopicOrPartition.

This may return *ShardErrors.

func (*Client) ListEndOffsets

func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

ListEndOffsets returns the end (newest) offsets for each partition in each requested topic. In Kafka terms, this returns high watermarks. If no topics are specified, all topics are listed. If a requested topic does not exist, no offsets for it are listed and it is not present in the response.

If any topics being listed do not exist, a special -1 partition is added to the response with the expected error code kerr.UnknownTopicOrPartition.

This may return *ShardErrors.

func (*Client) ListGroups

func (cl *Client) ListGroups(ctx context.Context, filterStates ...string) (ListedGroups, error)

ListGroups returns all groups in the cluster. If you are talking to Kafka 2.6+, filter states can be used to return groups only in the requested states. By default, this returns all groups. In almost all cases, DescribeGroups is more useful.

This may return *ShardErrors or *AuthError.

func (*Client) ListOffsetsAfterMilli

func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error)

ListOffsetsAfterMilli returns the first offsets after the requested millisecond timestamp. Unlike listing start/end/committed offsets, offsets returned from this function also include the timestamp of the offset. If no topics are specified, all topics are listed. If a partition has no offsets after the requested millisecond, the offset will be the current end offset. If a requested topic does not exist, no offsets for it are listed and it is not present in the response.

If any topics being listed do not exist, a special -1 partition is added to the response with the expected error code kerr.UnknownTopicOrPartition.

This may return *ShardErrors.

func (*Client) ListPartitionReassignments added in v1.1.0

func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (ListPartitionReassignmentsResponses, error)

ListPartitionReassignments lists the state of any active reassignments for all requested partitions, returning an error if the response could not be issued or if you do not have permissions.

func (*Client) ListStartOffsets

func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

ListStartOffsets returns the start (oldest) offsets for each partition in each requested topic. In Kafka terms, this returns the log start offset. If no topics are specified, all topics are listed. If a requested topic does not exist, no offsets for it are listed and it is not present in the response.

If any topics being listed do not exist, a special -1 partition is added to the response with the expected error code kerr.UnknownTopicOrPartition.

This may return *ShardErrors.

func (*Client) ListTopics

func (cl *Client) ListTopics(
	ctx context.Context,
	topics ...string,
) (TopicDetails, error)

ListTopics issues a metadata request and returns TopicDetails. Specific topics to describe can be passed as additional arguments. If no topics are specified, all topics are requested. Internal topics are not returned unless specifically requested. To see all topics including internal topics, use ListTopicsWithInternal.

This returns an error if the request fails to be issued, or an *AuthError.

func (*Client) ListTopicsWithInternal

func (cl *Client) ListTopicsWithInternal(
	ctx context.Context,
	topics ...string,
) (TopicDetails, error)

ListTopicsWithInternal is the same as ListTopics, but does not filter internal topics before returning.

func (*Client) ListTransactions added in v1.5.0

func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error)

ListTransactions returns all transactions and their states in the cluster. Filter states can be used to return transactions only in the requested states. By default, this returns all transactions you have DESCRIBE access to. Producer IDs can be specified to filter for transactions from the given producer.

This may return *ShardErrors or *AuthError.

func (*Client) Metadata

func (cl *Client) Metadata(
	ctx context.Context,
	topics ...string,
) (Metadata, error)

Metadata issues a metadata request and returns it. Specific topics to describe can be passed as additional arguments. If no topics are specified, all topics are requested.

This returns an error if the request fails to be issued, or an *AuthErr.

func (*Client) OffetForLeaderEpoch added in v1.5.0

OffsetForLeaderEpoch requests end offsets for the requested leader epoch in partitions in the request. This is a relatively advanced and client internal request, for more details, see the doc comments on the OffsetForLeaderEpoch type.

This may return *ShardErrors or *AuthError.

func (*Client) RenewDelegationToken added in v1.5.0

func (cl *Client) RenewDelegationToken(ctx context.Context, hmac []byte, renewTime time.Duration) (expiryTimestamp time.Time, err error)

RenewDelegationToken renews a delegation token that has not yet hit its max timestamp and returns the new expiry timestamp.

This can return *AuthError.

func (*Client) SetTimeoutMillis

func (cl *Client) SetTimeoutMillis(millis int32)

SetTimeoutMillis sets the timeout to use for requests that have a timeout, overriding the default of 15,000 (15s).

Not all requests have timeouts. Most requests are expected to return immediately or are expected to deliberately hang. The following requests have timeout fields:

Produce
CreateTopics
DeleteTopics
DeleteRecords
CreatePartitions
ElectLeaders
AlterPartitionAssignments
ListPartitionReassignments
UpdateFeatures

Not all requests above are supported in the admin API.

func (*Client) UpdatePartitions

func (cl *Client) UpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)

UpdatePartitions issues a create partitions request for the given topics, setting the final partition count to "set" for each topic. This request lets Kafka choose where the new partitions should be.

This does not return an error on authorization failures for the create partitions request itself, instead, authorization failures are included in the responses. Unlike CreatePartitions, this request uses your "set" value to set the new final count of partitions. "set" must be equal to or larger than the current count of partitions in the topic. All topics will have the same final count of partitions (unlike CreatePartitions, which allows you to add a specific count of partitions to topics that have a different amount of current partitions). You may consider checking ValidateUpdatePartitions before using this method.

func (*Client) ValidateAlterBrokerConfigs

func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)

ValidateAlterBrokerConfigs validates an incremental alter config for the given brokers.

This returns exactly what AlterBrokerConfigs returns, but does not actually alter configurations.

func (*Client) ValidateAlterBrokerConfigsState added in v1.8.0

func (cl *Client) ValidateAlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)

ValidateAlterBrokerConfigs validates an AlterBrokerconfigsState for the given brokers.

This returns exactly what AlterBrokerConfigs returns, but does not actually alter configurations.

func (*Client) ValidateAlterClientQuotas added in v1.5.0

func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)

ValidateAlterClientQuotas validates an alter client quota request. This returns exactly what AlterClientQuotas returns, but does not actually alter quotas.

func (*Client) ValidateAlterTopicConfigs

func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)

ValidateAlterTopicConfigs validates an incremental alter config for the given topics.

This returns exactly what AlterTopicConfigs returns, but does not actually alter configurations.

func (*Client) ValidateAlterTopicConfigsState added in v1.8.0

func (cl *Client) ValidateAlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)

ValidateAlterTopicConfigs validates an AlterTopicConfigsState for the given topics.

This returns exactly what AlterTopicConfigsState returns, but does not actually alter configurations.

func (*Client) ValidateCreatePartitions

func (cl *Client) ValidateCreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)

ValidateCreatePartitions validates a create partitions request for adding "add" partitions to the given topics.

This uses the same logic as CreatePartitions, but with the request's ValidateOnly field set to true. The response is the same response you would receive from CreatePartitions, but no partitions are actually added.

func (*Client) ValidateCreateTopics

func (cl *Client) ValidateCreateTopics(
	ctx context.Context,
	partitions int32,
	replicationFactor int16,
	configs map[string]*string,
	topics ...string,
) (CreateTopicResponses, error)

ValidateCreateTopics validates a create topics request with the given partitions, replication factor, and (optional) configs for every topic.

This package includes a StringPtr function to aid in building config values.

This uses the same logic as CreateTopics, but with the request's ValidateOnly field set to true. The response is the same response you would receive from CreateTopics, but no topics are actually created.

func (*Client) ValidateUpdatePartitions

func (cl *Client) ValidateUpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)

ValidateUpdatePartitions validates a create partitions request for setting the partition count on the given topics to "set".

This uses the same logic as UpdatePartitions, but with the request's ValidateOnly field set to true. The response is the same response you would receive from UpdatePartitions, but no partitions are actually added.

func (*Client) WriteTxnMarkers added in v1.6.0

func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error)

WriteTxnMarkers writes transaction markers to brokers. This is an advanced admin way to close out open transactions. See KIP-664 for more details.

This may return *ShardErrors or *AuthError.

type ClientQuotaEntity added in v1.5.0

type ClientQuotaEntity []ClientQuotaEntityComponent

ClientQuotaEntity contains the components that make up a single entity.

func (ClientQuotaEntity) String added in v1.5.0

func (ds ClientQuotaEntity) String() string

String returns {key=value, key=value}, joining all entities with a ", " and wrapping in braces.

type ClientQuotaEntityComponent added in v1.5.0

type ClientQuotaEntityComponent struct {
	Type string  // Type is the entity type ("user", "client-id", "ip").
	Name *string // Name is the entity name, or null if the default.
}

ClientQuotaEntityComponent is a quota entity component.

func (ClientQuotaEntityComponent) String added in v1.5.0

String returns key=value, or key=<default> if value is nil.

type ClientQuotaValue added in v1.5.0

type ClientQuotaValue struct {
	Key   string  // Key is the quota configuration key.
	Value float64 // Value is the quota configuration value.
}

ClientQuotaValue is a quota name and value.

func (ClientQuotaValue) String added in v1.5.0

func (d ClientQuotaValue) String() string

String returns key=value.

type ClientQuotaValues added in v1.5.0

type ClientQuotaValues []ClientQuotaValue

ClientQuotaValues contains all client quota values.

type Config

type Config struct {
	Key       string            // Key is the config name.
	Value     *string           // Value is the config value, if any.
	Sensitive bool              // Sensitive is if this config is sensitive (if so, Value is nil).
	Source    kmsg.ConfigSource // Source is where this config is defined from.

	// Synonyms contains fallback key/value pairs for this same
	// configuration key in order or preference. That is, if a config entry
	// is both dynamically defined and has a default value as well, the top
	// level config will be the dynamic value, while the synonym will be
	// the default.
	Synonyms []ConfigSynonym
}

Config is a configuration for a resource (topic, broker)

func (*Config) MaybeValue

func (c *Config) MaybeValue() string

MaybeValue returns the config's value if it is non-nil, otherwise an empty string.

type ConfigSynonym

type ConfigSynonym struct {
	Key    string            // Key is the fallback config name.
	Value  *string           // Value is the fallback config value, if any (sensitive is elided).
	Source kmsg.ConfigSource // Source is where this config synonym is defined from.
}

ConfigSynonym is a fallback value for a config.

type CreateACLsResult

type CreateACLsResult struct {
	Principal string
	Host      string

	Type       kmsg.ACLResourceType   // Type is the type of resource this is.
	Name       string                 // Name is the name of the resource allowed / denied.
	Pattern    ACLPattern             // Pattern is the name pattern.
	Operation  ACLOperation           // Operation is the operation allowed / denied.
	Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied.

	Err error // Err is the error for this ACL creation.
}

CreateACLsResult is a result for an individual ACL creation.

type CreateACLsResults

type CreateACLsResults []CreateACLsResult

CreateACLsResults contains all results to created ACLs.

type CreateDelegationToken added in v1.5.0

type CreateDelegationToken struct {
	// Owner overrides the owner of the token from the principal issuing
	// the request to the principal in this field. This allows a superuser
	// to create tokens without requiring individual user credentials, and
	// for a superuser to run clients on behalf of another user. These
	// fields require Kafka 3.3+; see KIP-373 for more details.
	Owner *Principal
	// Renewers is a list of principals that can renew the delegation
	// token in addition to the owner of the token. This list does not
	// include the owner.
	Renewers []Principal
	// MaxLifetime is how long the delegation token is valid for.
	// If -1, the default is the server's delegation.token.max.lifetime.ms,
	// which is by default 7d.
	MaxLifetime time.Duration
}

CreateDelegationToken is a create delegation token request, allowing you to create scoped tokens with the same ACLs as the creator. This allows you to more easily manage authorization for a wide array of clients. All delegation tokens use SCRAM-SHA-256 SASL for authorization.

type CreatePartitionsResponse

type CreatePartitionsResponse struct {
	Topic      string // Topic is the topic this response is for.
	Err        error  // Err is non-nil if partitions were unable to be added to this topic.
	ErrMessage string // ErrMessage a potential extra message describing any error.
}

CreatePartitionsResponse contains the response for an individual topic from a create partitions request.

type CreatePartitionsResponses

type CreatePartitionsResponses map[string]CreatePartitionsResponse

CreatePartitionsResponses contains per-topic responses for a create partitions request.

func (CreatePartitionsResponses) Error added in v1.11.0

func (rs CreatePartitionsResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (CreatePartitionsResponses) On

On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the topic does not exist, this returns kerr.UnknownTopicOrPartition.

func (CreatePartitionsResponses) Sorted

Sorted returns all create partitions responses sorted by topic.

type CreateTopicResponse

type CreateTopicResponse struct {
	Topic             string            // Topic is the topic that was created.
	ID                TopicID           // ID is the topic ID for this topic, if talking to Kafka v2.8+.
	Err               error             // Err is any error preventing this topic from being created.
	NumPartitions     int32             // NumPartitions is the number of partitions in the response, if talking to Kafka v2.4+.
	ReplicationFactor int16             // ReplicationFactor is how many replicas every partition has for this topic, if talking to Kafka 2.4+.
	Configs           map[string]Config // Configs contains the topic configuration (minus config synonyms), if talking to Kafka 2.4+.
}

CreateTopicResponse contains the response for an individual created topic.

type CreateTopicResponses

type CreateTopicResponses map[string]CreateTopicResponse

CreateTopicRepsonses contains per-topic responses for created topics.

func (CreateTopicResponses) Error added in v1.11.0

func (rs CreateTopicResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (CreateTopicResponses) On

On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the topic does not exist, this returns kerr.UnknownTopicOrPartition.

func (CreateTopicResponses) Sorted

Sorted returns all create topic responses sorted first by topic ID, then by topic name.

type CredInfo added in v1.5.0

type CredInfo struct {
	// Mechanism is the SCRAM mechanism a password exists for. This is 0
	// for UNKNOWN, 1 for SCRAM-SHA-256, and 2 for SCRAM-SHA-512.
	Mechanism ScramMechanism
	// Iterations is the number of SCRAM iterations for this password.
	Iterations int32
}

CredInfo contains the SCRAM mechanism and iterations for a password.

func (CredInfo) String added in v1.5.0

func (c CredInfo) String() string

String returns MECHANISM=iterations={c.Iterations}.

type DelegationToken added in v1.5.0

type DelegationToken struct {
	// Owner is the owner of the delegation token.
	Owner Principal
	// TokenRequesterPrincipal is the principal of the creator of the
	// token. This exists for v3+, where you can override the owner.
	// For prior than v3, this is just the Owner.
	TokenRequesterPrincipal Principal
	// IssueTimestamp is timestamp the delegation token creation request
	// is received within the broker.
	IssueTimestamp time.Time
	// ExpiryTimestamp is the timestamp the delegation token will expire.
	// This field is:
	//     min(MaxTimestamp, IssueTimestamp+delegation.token.expiry.time.ms)
	// where the default expiry is 24hr.
	ExpiryTimestamp time.Time
	// MaxTimestamp is the timestamp past which the delegation token cannot
	// be renewed. This is either the requested MaxLifetime, or the
	// broker's delegation.token.max.lifetime.ms which is 7d by default.
	MaxTimestamp time.Time
	// TokenID is the username of this token for use in authorization.
	TokenID string
	// HMAC is the password of this token for use for in authorization.
	HMAC []byte
	// Renewers is the list of principals that can renew this token in
	// addition to the owner (which always can).
	Renewers []Principal
}

DelegationToken contains information about a delegation token.

type DelegationTokens added in v1.5.0

type DelegationTokens []DelegationToken

DelegationTokens contains a list of delegation tokens.

type DeleteACLsResult

type DeleteACLsResult struct {
	Principal *string // Principal is the optional user that was used in this filter.
	Host      *string // Host is the optional host that was used in this filter.

	Type       kmsg.ACLResourceType   // Type is the type of resource used for this filter.
	Name       *string                // Name is the name of the resource used for this filter.
	Pattern    ACLPattern             // Pattern is the name pattern used for this filter.
	Operation  ACLOperation           // Operation is the operation used for this filter.
	Permission kmsg.ACLPermissionType // Permission is permission used for this filter.

	Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched.

	Err error // Err is non-nil if this filter has an error.
}

DeleteACLsResult contains the input used for a delete ACL filter, and the deletes that the filter matched or the error for this filter.

All fields but Deleted and Err are set from the request input. The response sets either Deleted (potentially to nothing if the filter matched nothing) or Err.

type DeleteACLsResults

type DeleteACLsResults []DeleteACLsResult

DeleteACLsResults contains all results to deleted ACLs.

type DeleteGroupResponse

type DeleteGroupResponse struct {
	Group string // Group is the group this response is for.
	Err   error  // Err is non-nil if the group failed to be deleted.
}

DeleteGroupResponse contains the response for an individual deleted group.

type DeleteGroupResponses

type DeleteGroupResponses map[string]DeleteGroupResponse

DeleteGroupResponses contains per-group responses to deleted groups.

func (DeleteGroupResponses) Error added in v1.11.0

func (rs DeleteGroupResponses) Error() error

Error iterates over all groups and returns the first error encountered, if any.

func (DeleteGroupResponses) On

On calls fn for the response group if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the group.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the group does not exist, this returns kerr.GroupIDNotFound.

func (DeleteGroupResponses) Sorted

Sorted returns all deleted group responses sorted by group name.

type DeleteOffsetsResponses

type DeleteOffsetsResponses map[string]map[int32]error

DeleteOffsetsResponses contains the per topic, per partition errors. If an offset deletion for a partition was successful, the error will be nil.

func (DeleteOffsetsResponses) EachError

func (ds DeleteOffsetsResponses) EachError(fn func(string, int32, error))

EachError calls fn for every partition that as a non-nil deletion error.

func (DeleteOffsetsResponses) Error added in v1.11.0

func (ds DeleteOffsetsResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (DeleteOffsetsResponses) Lookup

func (ds DeleteOffsetsResponses) Lookup(t string, p int32) (error, bool)

Lookup returns the response at t and p and whether it exists.

type DeleteRecordsResponse

type DeleteRecordsResponse struct {
	Topic        string // Topic is the topic this response is for.
	Partition    int32  // Partition is the partition this response is for.
	LowWatermark int64  // LowWatermark is the new earliest / start offset for this partition if the request was successful.
	Err          error  // Err is any error preventing the delete records request from being successful for this partition.
}

DeleteRecordsResponse contains the response for an individual partition from a delete records request.

type DeleteRecordsResponses

type DeleteRecordsResponses map[string]map[int32]DeleteRecordsResponse

DeleteRecordsResponses contains per-partition responses to a delete records request.

func (DeleteRecordsResponses) Each

Each calls fn for every delete records response.

func (DeleteRecordsResponses) Error added in v1.11.0

func (rs DeleteRecordsResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (DeleteRecordsResponses) Lookup

Lookup returns the response at t and p and whether it exists.

func (DeleteRecordsResponses) On

On calls fn for the response topic/partition if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the topic or partition does not exist, this returns kerr.UnknownTopicOrPartition.

func (DeleteRecordsResponses) Sorted

Sorted returns all delete records responses sorted first by topic, then by partition.

type DeleteSCRAM added in v1.5.0

type DeleteSCRAM struct {
	User      string         // User is the username to match for deletion.
	Mechanism ScramMechanism // Mechanism is the mechanism to match to delete a password for.
}

DeleteSCRAM deletes a password with the given mechanism for the user.

type DeleteTopicResponse

type DeleteTopicResponse struct {
	Topic      string  // Topic is the topic that was deleted, if not using topic IDs.
	ID         TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+ and using topic IDs.
	Err        error   // Err is any error preventing this topic from being deleted.
	ErrMessage string  // ErrMessage a potential extra message describing any error.
}

DeleteTopicResponse contains the response for an individual deleted topic.

type DeleteTopicResponses

type DeleteTopicResponses map[string]DeleteTopicResponse

DeleteTopicResponses contains per-topic responses for deleted topics.

func (DeleteTopicResponses) Error added in v1.11.0

func (rs DeleteTopicResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (DeleteTopicResponses) On

On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the topic does not exist, this returns kerr.UnknownTopicOrPartition.

func (DeleteTopicResponses) Sorted

Sorted returns all delete topic responses sorted first by topic ID, then by topic name.

type DeletedACL

type DeletedACL struct {
	Principal string // Principal is this deleted ACL's principal.
	Host      string // Host is this deleted ACL's host.

	Type       kmsg.ACLResourceType   // Type is this deleted ACL's resource type.
	Name       string                 // Name is this deleted ACL's resource name.
	Pattern    ACLPattern             // Pattern is this deleted ACL's resource name pattern.
	Operation  ACLOperation           // Operation is this deleted ACL's operation.
	Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission.

	Err error // Err is non-nil if this match has an error.
}

DeletedACL an ACL that was deleted.

type DeletedACLs

type DeletedACLs []DeletedACL

DeletedACLs contains ACLs that were deleted from a single delete filter.

type DescribeACLsResult

type DescribeACLsResult struct {
	Principal *string // Principal is the optional user that was used in this filter.
	Host      *string // Host is the optional host that was used in this filter.

	Type       kmsg.ACLResourceType   // Type is the type of resource used for this filter.
	Name       *string                // Name is the name of the resource used for this filter.
	Pattern    ACLPattern             // Pattern is the name pattern used for this filter.
	Operation  ACLOperation           // Operation is the operation used for this filter.
	Permission kmsg.ACLPermissionType // Permission is permission used for this filter.

	Described DescribedACLs // Described contains all ACLs this describe filter matched.

	Err error // Err is non-nil if this filter has an error.
}

DescribeACLsResults contains the input used for a describe ACL filter, and the describes that the filter matched or the error for this filter.

All fields but Described and Err are set from the request input. The response sets either Described (potentially to nothing if the filter matched nothing) or Err.

type DescribeACLsResults

type DescribeACLsResults []DescribeACLsResult

DescribeACLsResults contains all results to described ACLs.

type DescribeClientQuotaComponent added in v1.5.0

type DescribeClientQuotaComponent struct {
	Type      string          // Type is the type of entity component to describe ("user", "client-id", "ip").
	MatchName *string         // MatchName is the name to match again; this is only needed when MatchType is 0 (exact).
	MatchType QuotasMatchType // MatchType is how to match an entity.
}

DescribeClientQuotaComponent is an input entity component to describing client quotas: we define the type of quota ("client-id", "user"), how to match, and the match name if needed.

type DescribedACL

type DescribedACL struct {
	Principal string // Principal is this described ACL's principal.
	Host      string // Host is this described ACL's host.

	Type       kmsg.ACLResourceType   // Type is this described ACL's resource type.
	Name       string                 // Name is this described ACL's resource name.
	Pattern    ACLPattern             // Pattern is this described ACL's resource name pattern.
	Operation  ACLOperation           // Operation is this described ACL's operation.
	Permission kmsg.ACLPermissionType // Permission this described ACLs permission.
}

DescribedACL is an ACL that was described.

type DescribedACLs

type DescribedACLs []DescribedACL

DescribedACLs contains ACLs that were described from a single describe filter.

type DescribedAllLogDirs

type DescribedAllLogDirs map[int32]DescribedLogDirs

DescribedAllLogDirs contains per-broker responses to described log directories.

func (DescribedAllLogDirs) Each

func (ds DescribedAllLogDirs) Each(fn func(DescribedLogDir))

Each calls fn for every described log dir in all responses.

func (DescribedAllLogDirs) Sorted

func (ds DescribedAllLogDirs) Sorted() []DescribedLogDir

Sorted returns each log directory sorted by broker, then by directory.

type DescribedClientQuota added in v1.5.0

type DescribedClientQuota struct {
	Entity ClientQuotaEntity // Entity is the entity of this described client quota.
	Values ClientQuotaValues // Values contains the quota valies for this entity.
}

DescribedClientQuota contains a described quota. A single quota is made up of multiple entities and multiple values, for example, "user=foo" is one component of the entity, and "client-id=bar" is another.

type DescribedClientQuotas added in v1.5.0

type DescribedClientQuotas []DescribedClientQuota

DescribedClientQuota contains client quotas that were described.

type DescribedGroup

type DescribedGroup struct {
	Group string // Group is the name of the described group.

	Coordinator  BrokerDetail           // Coordinator is the coordinator broker for this group.
	State        string                 // State is the state this group is in (Empty, Dead, Stable, etc.).
	ProtocolType string                 // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect.
	Protocol     string                 // Protocol is the partition assignor strategy this group is using.
	Members      []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID.

	Err error // Err is non-nil if the group could not be described.
}

DescribedGroup contains data from a describe groups response for a single group.

func (*DescribedGroup) AssignedPartitions

func (d *DescribedGroup) AssignedPartitions() TopicsSet

AssignedPartitions returns the set of unique topics and partitions that are assigned across all members in this group.

This function is only relevant if the group is of type "consumer".

type DescribedGroupLag added in v1.9.0

type DescribedGroupLag struct {
	Group string // Group is the group name.

	Coordinator  BrokerDetail           // Coordinator is the coordinator broker for this group.
	State        string                 // State is the state this group is in (Empty, Dead, Stable, etc.).
	ProtocolType string                 // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect.
	Protocol     string                 // Protocol is the partition assignor strategy this group is using.
	Members      []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID.
	Lag          GroupLag               // Lag is the lag for the group.

	DescribeErr error // DescribeErr is the error returned from describing the group, if any.
	FetchErr    error // FetchErr is the error returned from fetching offsets, if any.
}

DescribedGroupLag contains a described group and its lag, or the errors that prevent the lag from being calculated.

func (*DescribedGroupLag) Error added in v1.9.0

func (l *DescribedGroupLag) Error() error

Err returns the first of DescribeErr or FetchErr that is non-nil.

type DescribedGroupLags added in v1.9.0

type DescribedGroupLags map[string]DescribedGroupLag

DescribedGroupLags is a map of group names to the described group with its lag, or error for those groups.

func (DescribedGroupLags) Each added in v1.9.0

func (ls DescribedGroupLags) Each(fn func(l DescribedGroupLag))

Each calls fn for every group.

func (DescribedGroupLags) EachError added in v1.9.0

func (ls DescribedGroupLags) EachError(fn func(l DescribedGroupLag))

EachError calls fn for every group that has a non-nil error.

func (DescribedGroupLags) Error added in v1.9.0

func (ls DescribedGroupLags) Error() error

Error iterates over all groups and returns the first error encountered, if any.

func (DescribedGroupLags) Ok added in v1.9.0

func (ls DescribedGroupLags) Ok() bool

Ok returns true if there are no errors. This is a shortcut for ls.Error() == nil.

func (DescribedGroupLags) Sorted added in v1.9.0

func (ls DescribedGroupLags) Sorted() []DescribedGroupLag

Sorted returns all lags sorted by group name.

type DescribedGroupMember

type DescribedGroupMember struct {
	MemberID   string  // MemberID is the Kafka assigned member ID of this group member.
	InstanceID *string // InstanceID is a potential user assigned instance ID of this group member (KIP-345).
	ClientID   string  // ClientID is the Kafka client given ClientID of this group member.
	ClientHost string  // ClientHost is the host this member is running on.

	Join     GroupMemberMetadata   // Join is what this member sent in its join group request; what it wants to consume.
	Assigned GroupMemberAssignment // Assigned is what this member was assigned to consume by the leader.
}

DescribedGroupMember is the detail of an individual group member as returned by a describe groups response.

type DescribedGroups

type DescribedGroups map[string]DescribedGroup

DescribedGroups contains data for multiple groups from a describe groups response.

func (DescribedGroups) AssignedPartitions

func (ds DescribedGroups) AssignedPartitions() TopicsSet

AssignedPartitions returns the set of unique topics and partitions that are assigned across all members in all groups. This is the all-group analogue to DescribedGroup.AssignedPartitions.

This function is only relevant for groups of type "consumer".

func (DescribedGroups) Error added in v1.11.0

func (ds DescribedGroups) Error() error

Error iterates over all groups and returns the first error encountered, if any.

func (DescribedGroups) Names

func (ds DescribedGroups) Names() []string

Topics returns a sorted list of all group names.

func (DescribedGroups) On

func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (DescribedGroup, error)

On calls fn for the group if it exists, returning the group and the error returned from fn. If fn is nil, this simply returns the group.

The fn is given a shallow copy of the group. This function returns the copy as well; any modifications within fn are modifications on the returned copy. Modifications on a described group's inner fields are persisted to the original map (because slices are pointers).

If the group does not exist, this returns kerr.GroupIDNotFound.

func (DescribedGroups) Sorted

func (ds DescribedGroups) Sorted() []DescribedGroup

Sorted returns all groups sorted by group name.

type DescribedLogDir

type DescribedLogDir struct {
	Broker int32                 // Broker is the broker being described.
	Dir    string                // Dir is the described directory.
	Topics DescribedLogDirTopics // Partitions are the partitions in this directory.
	Err    error                 // Err is non-nil if this directory could not be described.
}

DescribedLogDir is a described log directory.

func (DescribedLogDir) Size

func (ds DescribedLogDir) Size() int64

Size returns the total size of all partitions in this directory. This is a shortcut for .Topics.Size().

type DescribedLogDirPartition

type DescribedLogDirPartition struct {
	Broker    int32  // Broker is the broker this partition is on.
	Dir       string // Dir is the directory this partition lives in.
	Topic     string // Topic is the topic for this partition.
	Partition int32  // Partition is this partition.
	Size      int64  // Size is the total size of the log segments of this partition, in bytes.

	// OffsetLag is how far behind the log end offset this partition is.
	// The math is:
	//
	//     if IsFuture {
	//         logEndOffset - futureLogEndOffset
	//     } else {
	//         max(highWaterMark - logEndOffset)
	//     }
	//
	OffsetLag int64
	// IsFuture is true if this replica was created by an
	// AlterReplicaLogDirsRequest and will replace the current log of the
	// replica in the future.
	IsFuture bool
}

DescribedLogDirPartition is the information for a single partitions described log directory.

func (DescribedLogDirPartition) Less

Less returns if one dir partition is less than the other, by dir, topic, partition, and size.

func (DescribedLogDirPartition) LessBySize

LessBySize returns if one dir partition is less than the other by size, otherwise by normal Less semantics.

type DescribedLogDirTopics

type DescribedLogDirTopics map[string]map[int32]DescribedLogDirPartition

DescribedLogDirTopics contains per-partition described log directories.

func (DescribedLogDirTopics) Each

func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition))

Each calls fn for every partition.

func (DescribedLogDirTopics) Lookup

Lookup returns the described partition if it exists.

func (DescribedLogDirTopics) Size

func (ds DescribedLogDirTopics) Size() int64

Size returns the total size of all partitions in this directory.

func (DescribedLogDirTopics) Sorted

Sorted returns all partitions sorted by topic then partition.

func (DescribedLogDirTopics) SortedBySize

func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition

SortedBySize returns all partitions sorted by smallest size to largest. If partitions are of equal size, the sorting is topic then partition.

type DescribedLogDirs

type DescribedLogDirs map[string]DescribedLogDir

DescribedLogDirs contains per-directory responses to described log directories for a single broker.

func (DescribedLogDirs) Each

func (ds DescribedLogDirs) Each(fn func(DescribedLogDir))

Each calls fn for each log directory.

func (DescribedLogDirs) EachError

func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir))

EachError calls fn for every directory that has a non-nil error.

func (DescribedLogDirs) EachPartition

func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition))

Each calls fn for each partition in any directory.

func (DescribedLogDirs) Error

func (ds DescribedLogDirs) Error() error

Error iterates over all directories and returns the first error encounted, if any. This can be used to check if describing was entirely successful or not.

func (DescribedLogDirs) LargestPartitionBySize

func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool)

LargestPartitionBySize returns the largest partition by directory size, or no partition if there are no partitions.

func (DescribedLogDirs) Lookup

Lookup returns the described partition if it exists.

func (DescribedLogDirs) LookupPartition

func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool)

LookupPartition returns the described partition if it exists in any directory. Brokers should only have one replica of a partition, so this should always find at most one partition.

func (DescribedLogDirs) Ok

func (ds DescribedLogDirs) Ok() bool

Ok returns true if there are no errors. This is a shortcut for ds.Error() == nil.

func (DescribedLogDirs) Size

func (ds DescribedLogDirs) Size() int64

Size returns the total size of all directories.

func (DescribedLogDirs) SmallestPartitionBySize

func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool)

SmallestPartitionBySize returns the smallest partition by directory size, or no partition if there are no partitions.

func (DescribedLogDirs) Sorted

func (ds DescribedLogDirs) Sorted() []DescribedLogDir

Sorted returns all directories sorted by dir.

func (DescribedLogDirs) SortedBySize

func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir

SortedBySize returns all directories sorted from smallest total directory size to largest.

func (DescribedLogDirs) SortedPartitions

func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition

SortedPartitions returns all partitions sorted by dir, then topic, then partition.

func (DescribedLogDirs) SortedPartitionsBySize

func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition

SortedPartitionsBySize returns all partitions across all directories sorted by smallest to largest, falling back to by broker, dir, topic, and partition.

type DescribedProducer added in v1.5.0

type DescribedProducer struct {
	Leader                int32  // Leader is the leader broker for this topic / partition.
	Topic                 string // Topic is the topic being produced to.
	Partition             int32  // Partition is the partition being produced to.
	ProducerID            int64  // ProducerID is the producer ID that produced.
	ProducerEpoch         int16  // ProducerEpoch is the epoch that produced.
	LastSequence          int32  // LastSequence is the last sequence number the producer produced.
	LastTimestamp         int64  // LastTimestamp is the last time this producer produced.
	CoordinatorEpoch      int32  // CoordinatorEpoch is the epoch of the transactional coordinator for the last produce.
	CurrentTxnStartOffset int64  // CurrentTxnStartOffset is the first offset in the transaction.
}

DescribedProducer contains the state of a transactional producer's last produce.

func (*DescribedProducer) Less added in v1.6.0

Less returns whether the left described producer is less than the right, in order of:

  • Topic
  • Partition
  • ProducerID
  • ProducerEpoch
  • LastTimestamp
  • LastSequence

type DescribedProducers added in v1.5.0

type DescribedProducers map[int64]DescribedProducer

DescribedProducers maps producer IDs to the full described producer.

func (DescribedProducers) Each added in v1.5.0

func (ds DescribedProducers) Each(fn func(DescribedProducer))

Each calls fn for each described producer.

func (DescribedProducers) Sorted added in v1.5.0

func (ds DescribedProducers) Sorted() []DescribedProducer

Sorted returns the described producers sorted by topic, partition, and producer ID.

type DescribedProducersPartition added in v1.5.0

type DescribedProducersPartition struct {
	Leader          int32              // Leader is the leader broker for this topic / partition.
	Topic           string             // Topic is the topic whose producer's were described.
	Partition       int32              // Partition is the partition whose producer's were described.
	ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition.
	Err             error              // Err is non-nil if describing this partition failed.
}

DescribedProducersPartition is a partition whose producer's were described.

type DescribedProducersPartitions added in v1.5.0

type DescribedProducersPartitions map[int32]DescribedProducersPartition

DescribedProducersPartitions contains partitions whose producer's were described.

func (DescribedProducersPartitions) Each added in v1.5.0

Each calls fn for each partition.

func (DescribedProducersPartitions) EachProducer added in v1.5.0

func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer))

EachProducer calls fn for each producer in all partitions.

func (DescribedProducersPartitions) Sorted added in v1.5.0

Sorted returns the described partitions sorted by topic and partition.

func (DescribedProducersPartitions) SortedProducers added in v1.5.0

func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer

SortedProducer returns all producers sorted first by partition, then by producer ID.

type DescribedProducersTopic added in v1.5.0

type DescribedProducersTopic struct {
	Topic      string                       // Topic is the topic whose producer's were described.
	Partitions DescribedProducersPartitions // Partitions are partitions whose producer's were described.
}

DescribedProducersTopic contains topic partitions whose producer's were described.

type DescribedProducersTopics added in v1.5.0

type DescribedProducersTopics map[string]DescribedProducersTopic

DescribedProducersTopics contains topics whose producer's were described.

func (DescribedProducersTopics) Each added in v1.5.0

Each calls fn for every topic.

func (DescribedProducersTopics) EachPartition added in v1.5.0

func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition))

EachPartitions calls fn for all topic partitions.

func (DescribedProducersTopics) EachProducer added in v1.5.0

func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer))

EachProducer calls fn for each producer in all topics and partitions.

func (DescribedProducersTopics) Sorted added in v1.5.0

Sorted returns the described topics sorted by topic.

func (DescribedProducersTopics) SortedPartitions added in v1.5.0

func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition

Sorted returns the described partitions sorted by topic and partition.

func (DescribedProducersTopics) SortedProducers added in v1.5.0

func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer

SortedProducer returns all producers sorted first by partition, then by producer ID.

type DescribedTransaction added in v1.5.0

type DescribedTransaction struct {
	Coordinator    int32  // Coordinator is the coordinator broker for this transactional ID.
	TxnID          string // TxnID is the name of this transactional ID.
	State          string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence).
	TimeoutMillis  int32  // TimeoutMillis is the timeout of this transaction in milliseconds.
	StartTimestamp int64  // StartTimestamp is millisecond when this transaction started.
	ProducerID     int64  // ProducerID is the ID in use by the transactional ID.
	ProducerEpoch  int16  // ProducerEpoch is the epoch associated with the produce rID.

	// Topics is the set of partitions in the transaction, if active. When
	// preparing to commit or abort, this includes only partitions which do
	// not have markers. This does not include topics the user is not
	// authorized to describe.
	Topics TopicsSet

	Err error // Err is non-nil if the transaction could not be described.
}

DescribedTransaction contains data from a describe transactions response for a single transactional ID.

type DescribedTransactions added in v1.5.0

type DescribedTransactions map[string]DescribedTransaction

DescribedTransactions contains information from a describe transactions response.

func (DescribedTransactions) Each added in v1.5.0

func (ds DescribedTransactions) Each(fn func(DescribedTransaction))

Each calls fn for each described transaction.

func (DescribedTransactions) On added in v1.5.0

On calls fn for the transactional ID if it exists, returning the transaction and the error returned from fn. If fn is nil, this simply returns the transaction.

The fn is given a shallow copy of the transaction. This function returns the copy as well; any modifications within fn are modifications on the returned copy. Modifications on a described transaction's inner fields are persisted to the original map (because slices are pointers).

If the transaction does not exist, this returns kerr.TransactionalIDNotFound.

func (DescribedTransactions) Sorted added in v1.5.0

Sorted returns all described transactions sorted by transactional ID.

func (DescribedTransactions) TransactionalIDs added in v1.5.0

func (ds DescribedTransactions) TransactionalIDs() []string

TransactionalIDs returns a sorted list of all transactional IDs.

type DescribedUserSCRAM added in v1.5.0

type DescribedUserSCRAM struct {
	User       string     // User is the user this described user credential is for.
	CredInfos  []CredInfo // CredInfos contains SCRAM mechanisms the user has passwords for.
	Err        error      // Err is any error encountered when describing the user.
	ErrMessage string     // ErrMessage a potential extra message describing any error.
}

DescribedUserSCRAM contains a user, the SCRAM mechanisms that the user has passwords for, and if describing the user SCRAM credentials errored.

type DescribedUserSCRAMs added in v1.5.0

type DescribedUserSCRAMs map[string]DescribedUserSCRAM

DescribedUserSCRAMs contains described user SCRAM credentials keyed by user.

func (DescribedUserSCRAMs) AllFailed added in v1.5.0

func (ds DescribedUserSCRAMs) AllFailed() bool

AllFailed returns whether all described user credentials are errored.

func (DescribedUserSCRAMs) Each added in v1.5.0

func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM))

Each calls fn for every described user.

func (DescribedUserSCRAMs) EachError added in v1.5.0

func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM))

EachError calls fn for every described user that has a non-nil error.

func (DescribedUserSCRAMs) Error added in v1.5.0

func (ds DescribedUserSCRAMs) Error() error

Error iterates over all described users and returns the first error encountered, if any.

func (DescribedUserSCRAMs) Ok added in v1.5.0

func (ds DescribedUserSCRAMs) Ok() bool

Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.

func (DescribedUserSCRAMs) Sorted added in v1.5.0

Sorted returns the described user credentials ordered by user.

type ElectLeadersHow added in v1.5.0

type ElectLeadersHow int8

ElectLeadersHow is how partition leaders should be elected.

const (
	// ElectPreferredReplica elects the preferred replica for a partition.
	ElectPreferredReplica ElectLeadersHow = 0
	// ElectLiveReplica elects the first life replica if there are no
	// in-sync replicas (i.e., this is unclean leader election).
	ElectLiveReplica ElectLeadersHow = 1
)

type ElectLeadersResult added in v1.5.0

type ElectLeadersResult struct {
	Topic      string          // Topic is the topic this result is for.
	Partition  int32           // Partition is the partition this result is for.
	How        ElectLeadersHow // How is the type of election that was performed.
	Err        error           // Err is non-nil if electing this partition's leader failed, such as the partition not existing or the preferred leader is not available and you used ElectPreferredReplica.
	ErrMessage string          // ErrMessage a potential extra message describing any error.
}

ElectLeadersResult is the result for a single partition in an elect leaders request.

type ElectLeadersResults added in v1.5.0

type ElectLeadersResults map[string]map[int32]ElectLeadersResult

ElectLeadersResults contains per-topic, per-partition results for an elect leaders request.

type FetchOffsetsResponse

type FetchOffsetsResponse struct {
	Group   string          // Group is the offsets these fetches correspond to.
	Fetched OffsetResponses // Fetched contains offsets fetched for this group, if any.
	Err     error           // Err contains any error preventing offsets from being fetched.
}

FetchOffsetsResponse contains a fetch offsets response for a single group.

func (FetchOffsetsResponse) CommittedPartitions added in v1.2.0

func (r FetchOffsetsResponse) CommittedPartitions() TopicsSet

CommittedPartitions returns the set of unique topics and partitions that have been committed to in this group.

type FetchOffsetsResponses

type FetchOffsetsResponses map[string]FetchOffsetsResponse

FetchOFfsetsResponses contains responses for many fetch offsets requests.

func (FetchOffsetsResponses) AllFailed

func (rs FetchOffsetsResponses) AllFailed() bool

AllFailed returns whether all fetch offsets requests failed.

func (FetchOffsetsResponses) CommittedPartitions added in v1.2.0

func (rs FetchOffsetsResponses) CommittedPartitions() TopicsSet

CommittedPartitions returns the set of unique topics and partitions that have been committed to across all members in all responses. This is the all-group analogue to FetchOffsetsResponse.CommittedPartitions.

func (FetchOffsetsResponses) EachError

func (rs FetchOffsetsResponses) EachError(fn func(FetchOffsetsResponse))

EachError calls fn for every response that as a non-nil error.

func (FetchOffsetsResponses) Error added in v1.11.0

func (rs FetchOffsetsResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (FetchOffsetsResponses) On

On calls fn for the response group if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the group.

The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the group does not exist, this returns kerr.GroupIDNotFound.

type FindCoordinatorResponse added in v1.4.0

type FindCoordinatorResponse struct {
	Name       string // Name is the coordinator key this response is for.
	NodeID     int32  // NodeID is the node ID of the coordinator for this key.
	Host       string // Host is the host of the coordinator for this key.
	Port       int32  // Port is the port of the coordinator for this key.
	Err        error  // Err is any error encountered when requesting the coordinator.
	ErrMessage string // ErrMessage a potential extra message describing any error.
}

FindCoordinatorResponse contains information for the coordinator for a group or transactional ID.

type FindCoordinatorResponses added in v1.4.0

type FindCoordinatorResponses map[string]FindCoordinatorResponse

FindCoordinatorResponses contains responses to finding coordinators for groups or transactions.

func (FindCoordinatorResponses) AllFailed added in v1.4.0

func (rs FindCoordinatorResponses) AllFailed() bool

AllFailed returns whether all responses are errored.

func (FindCoordinatorResponses) Each added in v1.4.0

Each calls fn for every response.

func (FindCoordinatorResponses) EachError added in v1.4.0

func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse))

EachError calls fn for every response that has a non-nil error.

func (FindCoordinatorResponses) Error added in v1.4.0

func (rs FindCoordinatorResponses) Error() error

Error iterates over all responses and returns the first error encountered, if any.

func (FindCoordinatorResponses) Ok added in v1.4.0

Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.

func (FindCoordinatorResponses) Sorted added in v1.4.0

Sorted returns all coordinator responses sorted by name.

type GroupLag

type GroupLag map[string]map[int32]GroupMemberLag

GroupLag is the per-topic, per-partition lag of members in a group.

func CalculateGroupLag

func CalculateGroupLag(
	group DescribedGroup,
	commit OffsetResponses,
	endOffsets ListedOffsets,
) GroupLag

CalculateGroupLag returns the per-partition lag of all members in a group. The input to this method is the returns from the following methods (make sure to check shard errors):

// Note that FetchOffsets exists to fetch only one group's offsets,
// but some of the code below slightly changes.
groups := DescribeGroups(ctx, group)
commits := FetchManyOffsets(ctx, group)
var endOffsets ListedOffsets
listPartitions := described.AssignedPartitions()
listPartitions.Merge(commits.CommittedPartitions()
if topics := listPartitions.Topics(); len(topics) > 0 {
	endOffsets = ListEndOffsets(ctx, listPartitions.Topics())
}
for _, group := range groups {
	lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets)
}

If assigned partitions are missing in the listed end offsets, the partition will have an error indicating it is missing. A missing topic or partition in the commits is assumed to be nothing committing yet.

func (GroupLag) IsEmpty

func (l GroupLag) IsEmpty() bool

IsEmpty returns if the group is empty.

func (GroupLag) Lookup

func (l GroupLag) Lookup(t string, p int32) (GroupMemberLag, bool)

Lookup returns the lag at t and p and whether it exists.

func (GroupLag) Sorted

func (l GroupLag) Sorted() []GroupMemberLag

Sorted returns the per-topic, per-partition lag by member sorted in order by topic then partition.

func (GroupLag) Total

func (l GroupLag) Total() int64

Total returns the total lag across all topics.

func (GroupLag) TotalByTopic

func (l GroupLag) TotalByTopic() GroupTopicsLag

TotalByTopic returns the total lag for each topic.

type GroupMemberAssignment

type GroupMemberAssignment struct {
	// contains filtered or unexported fields
}

GroupMemberAssignment is the assignment that a leader sent / a member received in a SyncGroup request. This can have one of three types:

*kmsg.ConsumerMemberAssignment, if the group's ProtocolType is "consumer"
*kmsg.ConnectMemberAssignment, if the group's ProtocolType is "connect"
[]byte, if the group's ProtocolType is unknown

func (GroupMemberAssignment) AsConnect

AsConnect returns the assignment as ConnectMemberAssignment if possible.

func (GroupMemberAssignment) AsConsumer

AsConsumer returns the assignment as a ConsumerMemberAssignment if possible.

func (GroupMemberAssignment) Raw

func (m GroupMemberAssignment) Raw() ([]byte, bool)

Raw returns the assignment as a raw byte slice, if it is neither of consumer type nor connect type.

type GroupMemberLag

type GroupMemberLag struct {
	// Member is a reference to the group member consuming this partition.
	// If the group is in state Empty, the member will be nil.
	Member    *DescribedGroupMember
	Topic     string // Topic is the topic this lag is for.
	Partition int32  // Partition is the partition this lag is for.

	Commit Offset       // Commit is this member's current offset commit.
	End    ListedOffset // EndOffset is a reference to the end offset of this partition.
	Lag    int64        // Lag is how far behind this member is, or -1 if there is a commit error or list offset error.

	Err error // Err is either the commit error, or the list end offsets error, or nil.
}

GroupMemberLag is the lag between a group member's current offset commit and the current end offset.

If either the offset commits have load errors, or the listed end offsets have load errors, the Lag field will be -1 and the Err field will be set (to the first of either the commit error, or else the list error).

If the group is in the Empty state, lag is calculated for all partitions in a topic, but the member is nil. The calculate function assumes that any assigned topic is meant to be entirely consumed. If the group is Empty and topics could not be listed, some partitions may be missing.

func (*GroupMemberLag) IsEmpty

func (g *GroupMemberLag) IsEmpty() bool

IsEmpty returns if the this lag is for a group in the Empty state.

type GroupMemberMetadata

type GroupMemberMetadata struct {
	// contains filtered or unexported fields
}

GroupMemberMetadata is the metadata that a client sent in a JoinGroup request. This can have one of three types:

*kmsg.ConsumerMemberMetadata, if the group's ProtocolType is "consumer"
*kmsg.ConnectMemberMetadata, if the group's ProtocolType is "connect"
[]byte, if the group's ProtocolType is unknown

func (GroupMemberMetadata) AsConnect

AsConnect returns the metadata as ConnectMemberMetadata if possible.

func (GroupMemberMetadata) AsConsumer

AsConsumer returns the metadata as a ConsumerMemberMetadata if possible.

func (GroupMemberMetadata) Raw

func (m GroupMemberMetadata) Raw() ([]byte, bool)

Raw returns the metadata as a raw byte slice, if it is neither of consumer type nor connect type.

type GroupTopicsLag

type GroupTopicsLag map[string]TopicLag

GroupTopicsLag is the total lag per topic within a group.

func (GroupTopicsLag) Sorted

func (l GroupTopicsLag) Sorted() []TopicLag

Sorted returns the per-topic lag, sorted by topic.

type IncrementalOp

type IncrementalOp int8

IncrementalOp is a typed int8 that is used for incrementally updating configuration keys for topics and brokers.

const (
	// SetConfig is an incremental operation to set an individual config
	// key.
	SetConfig IncrementalOp = iota

	// DeleteConfig is an incremental operation to delete an individual
	// config key.
	DeleteConfig

	// AppendConfig is an incremental operation to append a value to a
	// config key that is a list type.
	AppendConfig

	// SubtractConfig is an incremental operation to remove a value from a
	// config key that is a list type.
	SubtractConfig
)

type LeaveGroupBuilder added in v1.3.0

type LeaveGroupBuilder struct {
	// contains filtered or unexported fields
}

LeaveGroupBuilder helps build a leave group request, rather than having a function signature (string, string, ...string).

All functions on this type accept and return the same pointer, allowing for easy build-and-use usage.

func LeaveGroup added in v1.3.0

func LeaveGroup(group string) *LeaveGroupBuilder

LeaveGroup returns a LeaveGroupBuilder for the input group.

func (*LeaveGroupBuilder) InstanceIDs added in v1.3.0

func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder

InstanceIDs are members to remove from a group.

func (*LeaveGroupBuilder) Reason added in v1.3.0

func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder

Reason attaches a reason to all members in the leave group request. This requires Kafka 3.2+.

type LeaveGroupResponse added in v1.3.0

type LeaveGroupResponse struct {
	Group      string // Group is the group that was left.
	InstanceID string // InstanceID is the instance ID that left the group.
	MemberID   string // MemberID is the member ID that left the group.
	Err        error  // Err is non-nil if this member did not exist.
}

LeaveGroupResponse contains the response for an individual instance ID that left a group.

type LeaveGroupResponses added in v1.3.0

type LeaveGroupResponses map[string]LeaveGroupResponse

LeaveGroupResponses contains responses for each member of a leave group request. The map key is the instance ID that was removed from the group.

func (LeaveGroupResponses) Each added in v1.3.0

func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse))

Each calls fn for every removed member.

func (LeaveGroupResponses) EachError added in v1.3.0

func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse))

EachError calls fn for every removed member that has a non-nil error.

func (LeaveGroupResponses) Error added in v1.3.0

func (ls LeaveGroupResponses) Error() error

Error iterates over all removed members and returns the first error encountered, if any.

func (LeaveGroupResponses) Ok added in v1.3.0

func (ls LeaveGroupResponses) Ok() bool

Ok returns true if there are no errors. This is a shortcut for ls.Error() == nil.

func (LeaveGroupResponses) Sorted added in v1.3.0

Sorted returns all removed group members by instance ID.

type ListPartitionReassignmentsResponse added in v1.1.0

type ListPartitionReassignmentsResponse struct {
	Topic            string  // Topic is the topic that was listed.
	Partition        int32   // Partition is the partition that was listed.
	Replicas         []int32 // Replicas are the partition's current replicas.
	AddingReplicas   []int32 // AddingReplicas are replicas currently being added to the partition.
	RemovingReplicas []int32 // RemovingReplicas are replicas currently being removed from the partition.
}

ListPartitionReassignmentsResponse contains a response for an individual partition that was listed.

type ListPartitionReassignmentsResponses added in v1.1.0

type ListPartitionReassignmentsResponses map[string]map[int32]ListPartitionReassignmentsResponse

ListPartitionReassignmentsResponses contains responses to all partitions in a list reassignment request.

func (ListPartitionReassignmentsResponses) Each added in v1.1.0

Each calls fn for every response.

func (ListPartitionReassignmentsResponses) Sorted added in v1.1.0

Sorted returns the responses sorted by topic and partition.

type ListedGroup

type ListedGroup struct {
	Coordinator  int32  // Coordinator is the node ID of the coordinator for this group.
	Group        string // Group is the name of this group.
	ProtocolType string // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect.
	State        string // State is the state this group is in (Empty, Dead, Stable, etc.; only if talking to Kafka 2.6+).
}

ListedGroup contains data from a list groups response for a single group.

type ListedGroups

type ListedGroups map[string]ListedGroup

ListedGroups contains information from a list groups response.

func (ListedGroups) Groups

func (ls ListedGroups) Groups() []string

Groups returns a sorted list of all group names.

func (ListedGroups) Sorted

func (ls ListedGroups) Sorted() []ListedGroup

Sorted returns all groups sorted by group name.

type ListedOffset

type ListedOffset struct {
	Topic     string // Topic is the topic this offset is for.
	Partition int32  // Partition is the partition this offset is for.

	Timestamp   int64 // Timestamp is the millisecond of the offset if listing after a time, otherwise -1.
	Offset      int64 // Offset is the record offset, or -1 if one could not be found.
	LeaderEpoch int32 // LeaderEpoch is the leader epoch at this offset, if any, otherwise -1.

	Err error // Err is non-nil if the partition has a load error.
}

ListedOffset contains record offset information.

type ListedOffsets

type ListedOffsets map[string]map[int32]ListedOffset

ListedOffsets contains per-partition record offset information that is returned from any of the List.*Offsets functions.

func (ListedOffsets) Each

func (l ListedOffsets) Each(fn func(ListedOffset))

Each calls fn for each listed offset.

func (ListedOffsets) Error

func (l ListedOffsets) Error() error

Error iterates over all offsets and returns the first error encountered, if any. This can be to check if a listing was entirely successful or not.

Note that offset listing can be partially successful. For example, some offsets could succeed to be listed, while other could fail (maybe one partition is offline). If this is something you need to worry about, you may need to check all offsets manually.

func (ListedOffsets) KOffsets

func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset

KOffsets returns these listed offsets as a kgo offset map.

func (ListedOffsets) Lookup

func (l ListedOffsets) Lookup(t string, p int32) (ListedOffset, bool)

Lookup returns the offset at t and p and whether it exists.

func (ListedOffsets) Offsets

func (l ListedOffsets) Offsets() Offsets

Offsets returns these listed offsets as offsets.

type ListedTransaction added in v1.5.0

type ListedTransaction struct {
	Coordinator int32  // Coordinator the coordinator broker for this transactional ID.
	TxnID       string // TxnID is the name of this transactional ID.
	ProducerID  int64  // ProducerID is the producer ID for this transaction.
	State       string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence).
}

ListedTransaction contains data from a list transactions response for a single transactional ID.

type ListedTransactions added in v1.5.0

type ListedTransactions map[string]ListedTransaction

ListedTransactions contains information from a list transactions response.

func (ListedTransactions) Each added in v1.5.0

func (ls ListedTransactions) Each(fn func(ListedTransaction))

Each calls fn for each listed transaction.

func (ListedTransactions) Sorted added in v1.5.0

func (ls ListedTransactions) Sorted() []ListedTransaction

Sorted returns all transactions sorted by transactional ID.

func (ListedTransactions) TransactionalIDs added in v1.5.0

func (ls ListedTransactions) TransactionalIDs() []string

TransactionalIDs returns a sorted list of all transactional IDs.

type Metadata

type Metadata struct {
	Cluster    string        // Cluster is the cluster name, if any.
	Controller int32         // Controller is the node ID of the controller broker, if available, otherwise -1.
	Brokers    BrokerDetails // Brokers contains broker details, sorted by default.
	Topics     TopicDetails  // Topics contains topic details.
}

Metadata is the data from a metadata response.

type Offset

type Offset struct {
	Topic       string
	Partition   int32
	At          int64  // Offset is the partition to set.
	LeaderEpoch int32  // LeaderEpoch is the broker leader epoch of the record at this offset.
	Metadata    string // Metadata, if non-empty, is used for offset commits.
}

Offset is an offset for a topic.

type OffsetForLeaderEpoch added in v1.5.0

type OffsetForLeaderEpoch struct {
	NodeID    int32  // NodeID is the node that is the leader of this topic / partition.
	Topic     string // Topic is the topic this leader epoch response is for.
	Partition int32  // Partition is the partition this leader epoch response is for.

	// LeaderEpoch is either
	//
	// 1) -1, if the requested LeaderEpoch is unknown.
	//
	// 2) Less than the requested LeaderEpoch, if the requested LeaderEpoch
	// exists but has no records in it. For example, epoch 1 had end offset
	// 37, then epoch 2 and 3 had no records: if you request LeaderEpoch 3,
	// this will return LeaderEpoch 1 with EndOffset 37.
	//
	// 3) Equal to the requested LeaderEpoch, if the requested LeaderEpoch
	// is equal to or less than the current epoch for the partition.
	LeaderEpoch int32

	// EndOffset is either
	//
	// 1) The LogEndOffset, if the broker has the same LeaderEpoch as the
	// request.
	//
	// 2) the beginning offset of the next LeaderEpoch, if the broker has a
	// higher LeaderEpoch.
	//
	// The second option allows the user to detect data loss: if the
	// consumer consumed past the EndOffset that is returned, then the
	// consumer should reset to the returned offset and the consumer knows
	// that everything from the returned offset to the requested offset was
	// lost.
	EndOffset int64

	// Err is non-nil if this partition had a response error.
	Err error
}

OffsetForLeaderEpoch contains a response for a single partition in an OffsetForLeaderEpoch request.

type OffsetForLeaderEpochRequest added in v1.5.0

type OffsetForLeaderEpochRequest map[string]map[int32]int32

OffsetForLeaderEpochRequest contains topics, partitions, and leader epochs to request offsets for in an OffsetForLeaderEpoch.

func (*OffsetForLeaderEpochRequest) Add added in v1.5.0

func (l *OffsetForLeaderEpochRequest) Add(topic string, partition, leaderEpoch int32)

Add adds a topic, partition, and leader epoch to the request.

type OffsetResponse

type OffsetResponse struct {
	Offset
	Err error // Err is non-nil if the offset operation failed.
}

OffsetResponse contains the response for an individual offset for offset methods.

type OffsetResponses

type OffsetResponses map[string]map[int32]OffsetResponse

OffsetResponses contains per-partition responses to offset methods.

func (*OffsetResponses) Add

func (os *OffsetResponses) Add(o OffsetResponse)

Add adds an offset for a given topic/partition to this OffsetResponses map (even if it exists).

func (OffsetResponses) DeleteFunc

func (os OffsetResponses) DeleteFunc(fn func(OffsetResponse) bool)

DeleteFunc deletes any offset for which fn returns true.

func (OffsetResponses) Each

func (os OffsetResponses) Each(fn func(OffsetResponse))

Each calls fn for every offset.

func (OffsetResponses) EachError

func (os OffsetResponses) EachError(fn func(o OffsetResponse))

EachError calls fn for every offset that as a non-nil error.

func (OffsetResponses) Error

func (os OffsetResponses) Error() error

Error iterates over all offsets and returns the first error encountered, if any. This can be used to check if an operation was entirely successful or not.

Note that offset operations can be partially successful. For example, some offsets could succeed in an offset commit while others fail (maybe one topic does not exist for some reason, or you are not authorized for one topic). If this is something you need to worry about, you may need to check all offsets manually.

func (OffsetResponses) KOffsets

func (os OffsetResponses) KOffsets() map[string]map[int32]kgo.Offset

KOffsets returns these offset responses as a kgo offset map.

func (OffsetResponses) Keep

func (os OffsetResponses) Keep(o Offsets)

Keep filters the responses to only keep the input offsets.

func (OffsetResponses) KeepFunc

func (os OffsetResponses) KeepFunc(fn func(OffsetResponse) bool)

DeleteFunc keeps only the offsets for which fn returns true.

func (OffsetResponses) Lookup

func (os OffsetResponses) Lookup(t string, p int32) (OffsetResponse, bool)

Lookup returns the offset at t and p and whether it exists.

func (OffsetResponses) Offsets

func (os OffsetResponses) Offsets() Offsets

Offsets returns these offset responses as offsets.

func (OffsetResponses) Ok

func (os OffsetResponses) Ok() bool

Ok returns true if there are no errors. This is a shortcut for os.Error() == nil.

func (OffsetResponses) Partitions added in v1.2.0

func (os OffsetResponses) Partitions() TopicsSet

Partitions returns the set of unique topics and partitions in these offsets.

func (OffsetResponses) Sorted

func (os OffsetResponses) Sorted() []OffsetResponse

Sorted returns the responses sorted by topic and partition.

type Offsets

type Offsets map[string]map[int32]Offset

Offsets wraps many offsets and is the type used for offset functions.

func OffsetsFromFetches

func OffsetsFromFetches(fs kgo.Fetches) Offsets

OffsetsFromFetches returns Offsets for the final record in any partition in the fetches. This is a helper to enable committing an entire returned batch.

This function looks at only the last record per partition, assuming that the last record is the highest offset (which is the behavior returned by kgo's Poll functions). The returned offsets are one past the offset contained in the records.

func OffsetsFromRecords

func OffsetsFromRecords(rs ...kgo.Record) Offsets

OffsetsFromRecords returns offsets for all given records, using the highest offset per partition. The returned offsets are one past the offset contained in the records.

func (*Offsets) Add

func (os *Offsets) Add(o Offset)

Add adds an offset for a given topic/partition to this Offsets map.

If the partition already exists, the offset is only added if:

  • the new leader epoch is higher than the old, or
  • the leader epochs equal, and the new offset is higher than the old

If you would like to add offsets forcefully no matter what, use the Delete method before this.

func (*Offsets) AddOffset

func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32)

AddOffset is a helper to add an offset for a given topic and partition. The leader epoch field must be -1 if you do not know the leader epoch or if you do not have an offset yet.

func (Offsets) Delete

func (os Offsets) Delete(t string, p int32)

Delete removes any offset at topic t and partition p.

func (Offsets) DeleteFunc

func (os Offsets) DeleteFunc(fn func(o Offset) bool)

DeleteFunc calls fn for every offset, deleting the offset if fn returns true.

func (Offsets) Each

func (os Offsets) Each(fn func(Offset))

Each calls fn for each offset in these offsets.

func (Offsets) KOffsets

func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset

KOffsets returns these offsets as a kgo offset map.

func (Offsets) KeepFunc

func (os Offsets) KeepFunc(fn func(o Offset) bool)

KeepFunc calls fn for every offset, keeping the offset if fn returns true.

func (Offsets) Lookup

func (os Offsets) Lookup(t string, p int32) (Offset, bool)

Lookup returns the offset at t and p and whether it exists.

func (Offsets) Sorted

func (os Offsets) Sorted() []Offset

Sorted returns the offsets sorted by topic and partition.

func (Offsets) TopicsSet

func (os Offsets) TopicsSet() TopicsSet

Topics returns the set of topics and partitions currently used in these offsets.

type OffsetsForLeaderEpochs added in v1.5.0

type OffsetsForLeaderEpochs map[string]map[int32]OffsetForLeaderEpoch

OffsetsForLeaderEpochs contains responses for partitions in a OffsetForLeaderEpochRequest.

type OffsetsList

type OffsetsList []Offset

OffsetsList wraps many offsets and is a helper for building Offsets.

func (OffsetsList) KOffsets

func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset

KOffsets returns this list as a kgo offset map.

func (OffsetsList) Offsets

func (l OffsetsList) Offsets() Offsets

Offsets returns this list as the non-list Offsets. All fields in each Offset must be set properly.

type Partition

type Partition struct {
	Topic     string // Topic is the topic for this partition.
	Partition int32  // Partition is this partition's number.
}

Partition is a partition for a topic.

type PartitionDetail

type PartitionDetail struct {
	Topic     string // Topic is the topic this partition belongs to.
	Partition int32  // Partition is the partition number these details are for.

	Leader          int32   // Leader is the broker leader, if there is one, otherwise -1.
	LeaderEpoch     int32   // LeaderEpoch is the leader's current epoch.
	Replicas        []int32 // Replicas is the list of replicas.
	ISR             []int32 // ISR is the list of in sync replicas.
	OfflineReplicas []int32 // OfflineReplicas is the list of offline replicas.

	Err error // Err is non-nil if the partition currently has a load error.
}

PartitionDetail is the detail of a partition as returned by a metadata response. If the partition fails to load / has an error, then only the partition number itself and the Err fields will be set.

type PartitionDetails

type PartitionDetails map[int32]PartitionDetail

PartitionDetails contains details for partitions as returned by a metadata response.

func (PartitionDetails) NumReplicas

func (ds PartitionDetails) NumReplicas() int

NumReplicas returns the number of replicas for these partitions

It is assumed that all partitions have the same number of replicas, so this simply returns the number of replicas in the first encountered partition.

func (PartitionDetails) Numbers

func (ds PartitionDetails) Numbers() []int32

Numbers returns a sorted list of all partition numbers.

func (PartitionDetails) Sorted

func (ds PartitionDetails) Sorted() []PartitionDetail

Sorted returns the partitions in sorted order.

type Partitions

type Partitions []Partition

Partitions wraps many partitions.

func (Partitions) TopicsList

func (ps Partitions) TopicsList() TopicsList

TopicsList returns these partitions as sorted TopicsList.

func (Partitions) TopicsSet

func (ps Partitions) TopicsSet() TopicsSet

TopicsSet returns these partitions as TopicsSet.

type Principal added in v1.5.0

type Principal struct {
	Type string // Type is the type of a principal owner or renewer. If empty, this defaults to "User".
	Name string // Name is the name of a principal owner or renewer.
}

Principal is a principal that owns or renews a delegation token. This is the same as an ACL's principal, but rather than being a single string, the type and name are split into two fields.

type QuotasMatchType added in v1.5.0

type QuotasMatchType = kmsg.QuotasMatchType

QuotasMatchType specifies how to match a described client quota entity.

0 means to match the name exactly: user=foo will only match components of entity type "user" and entity name "foo".

1 means to match the default of the name: entity type "user" with a default match will return the default quotas for user entities.

2 means to match any name: entity type "user" with any matching will return both names and defaults.

type ResourceConfig

type ResourceConfig struct {
	Name    string   // Name is the name of this resource.
	Configs []Config // Configs are the configs for this topic.
	Err     error    // Err is any error preventing configs from loading (likely, an unknown topic).
}

ResourceConfig contains the configuration values for a resource (topic, broker, broker logger).

type ResourceConfigs

type ResourceConfigs []ResourceConfig

ResourceConfigs contains the configuration values for many resources.

func (ResourceConfigs) On

func (rs ResourceConfigs) On(name string, fn func(*ResourceConfig) error) (ResourceConfig, error)

On calls fn for the response config if it exists, returning the config and the error returned from fn. If fn is nil, this simply returns the config.

The fn is given a copy of the config. This function returns the copy as well; any modifications within fn are modifications on the returned copy.

If the resource does not exist, this returns kerr.UnknownTopicOrPartition.

type ScramMechanism added in v1.5.0

type ScramMechanism int8

ScramMechanism is a SCRAM mechanism.

const (
	// ScramSha256 represents the SCRAM-SHA-256 mechanism.
	ScramSha256 ScramMechanism = 1
	// ScramSha512 represents the SCRAM-SHA-512 mechanism.
	ScramSha512 ScramMechanism = 2
)

func (ScramMechanism) String added in v1.5.0

func (s ScramMechanism) String() string

String returns either SCRAM-SHA-256, SCRAM-SHA-512, or UNKNOWN.

type ShardError

type ShardError struct {
	Req kmsg.Request // Req is a piece of the original request.
	Err error        // Err is the error that resulted in this request failing.

	// Broker, if non-nil, is the broker this request was meant to be
	// issued to. If the NodeID is -1, then this piece of the request
	// failed before being mapped to a broker.
	Broker BrokerDetail
}

ShardError is a piece of a request that failed. See ShardErrors for more detail.

type ShardErrors

type ShardErrors struct {
	Name      string       // Name is the name of the request these shard errors are for.
	AllFailed bool         // AllFailed indicates if the original request was entirely unsuccessful.
	Errs      []ShardError // Errs contains all individual shard errors.
}

ShardErrors contains each individual error shard of a request.

Under the hood, some requests to Kafka need to be mapped to brokers, split, and sent to many brokers. The kgo.Client handles this all internally, but returns the individual pieces that were requested as "shards". Internally, each of these pieces can also fail, and they can all fail uniquely.

The kadm package takes one further step and hides the failing pieces into one meta error, the ShardErrors. Methods in this package that can return this meta error are documented; if desired, you can use errors.As to check and unwrap any ShardErrors return.

If a request returns ShardErrors, it is possible that some aspects of the request were still successful. You can check ShardErrors.AllFailed as a shortcut for whether any of the response is usable or not.

func (*ShardErrors) Error

func (e *ShardErrors) Error() string

Error returns an error indicating the name of the request that failed, the number of separate errors, and the first error.

type TopicDetail

type TopicDetail struct {
	Topic string // Topic is the topic these details are for.

	ID         TopicID          // TopicID is the topic's ID, or all 0 if the broker does not support IDs.
	IsInternal bool             // IsInternal is whether the topic is an internal topic.
	Partitions PartitionDetails // Partitions contains details about the topic's partitions.

	Err error // Err is non-nil if the topic could not be loaded.
}

TopicDetail is the detail of a topic as returned by a metadata response. If the topic fails to load / has an error, then there will be no partitions.

type TopicDetails

type TopicDetails map[string]TopicDetail

TopicDetails contains details for topics as returned by a metadata response.

func (TopicDetails) EachError

func (ds TopicDetails) EachError(fn func(TopicDetail))

EachError calls fn for each topic that could not be loaded.

func (TopicDetails) EachPartition

func (ds TopicDetails) EachPartition(fn func(PartitionDetail))

EachPartition calls fn for every partition in all topics.

func (TopicDetails) Error added in v1.9.0

func (ds TopicDetails) Error() error

Error iterates over all topic details and returns the first error encountered, if any.

func (TopicDetails) FilterInternal

func (ds TopicDetails) FilterInternal()

FilterInternal deletes any internal topics from this set of topic details.

func (TopicDetails) Has

func (ds TopicDetails) Has(topic string) bool

Has returns whether the topic details has the given topic and, if so, that the topic's load error is not an unknown topic error.

func (TopicDetails) Names

func (ds TopicDetails) Names() []string

Topics returns a sorted list of all topic names.

func (TopicDetails) Sorted

func (ds TopicDetails) Sorted() []TopicDetail

Sorted returns all topics in sorted order.

func (TopicDetails) TopicsList

func (ds TopicDetails) TopicsList() TopicsList

TopicsList returns the topics and partitions as a list.

func (TopicDetails) TopicsSet

func (ds TopicDetails) TopicsSet() TopicsSet

TopicsSet returns the topics and partitions as a set.

type TopicID

type TopicID [16]byte

TopicID is the 16 byte underlying topic ID.

func (TopicID) Less

func (t TopicID) Less(other TopicID) bool

Less returns if this ID is less than the other, byte by byte.

func (TopicID) MarshalJSON

func (t TopicID) MarshalJSON() ([]byte, error)

MarshalJSON returns the topic ID encoded as quoted base64.

func (TopicID) String

func (t TopicID) String() string

String returns the topic ID encoded as base64.

type TopicLag

type TopicLag struct {
	Topic string
	Lag   int64
}

TopicLag is the lag for an individual topic within a group.

type TopicPartitions

type TopicPartitions struct {
	Topic      string
	Partitions []int32
}

TopicPartitions is a topic and partitions.

type TopicsList

type TopicsList []TopicPartitions

TopicsList is a list of topics and partitions.

func (TopicsList) Each

func (l TopicsList) Each(fn func(t string, p int32))

Each calls fn for each topic / partition in the topics list.

func (TopicsList) EachPartitions added in v1.6.0

func (l TopicsList) EachPartitions(fn func(t string, ps []int32))

EachPartitions calls fn for each topic and its partitions in the topics list.

func (TopicsList) EmptyTopics added in v1.6.0

func (l TopicsList) EmptyTopics() []string

EmptyTopics returns all topics with no partitions.

func (TopicsList) IntoSet

func (l TopicsList) IntoSet() TopicsSet

IntoSet returns this list as a set.

func (TopicsList) Topics added in v1.6.0

func (l TopicsList) Topics() []string

Topics returns all topics in this set in sorted order.

type TopicsSet

type TopicsSet map[string]map[int32]struct{}

TopicsSet is a set of topics and, per topic, a set of partitions.

All methods provided for TopicsSet are safe to use on a nil (default) set.

func (*TopicsSet) Add

func (s *TopicsSet) Add(t string, ps ...int32)

Add adds partitions for a topic to the topics set. If no partitions are added, this still creates the topic.

func (TopicsSet) Delete

func (s TopicsSet) Delete(t string, ps ...int32)

Delete removes partitions from a topic from the topics set. If the topic ends up with no partitions, the topic is removed from the set.

func (TopicsSet) Each

func (s TopicsSet) Each(fn func(t string, p int32))

Each calls fn for each topic / partition in the topics set.

func (TopicsSet) EachPartitions added in v1.6.0

func (s TopicsSet) EachPartitions(fn func(t string, ps []int32))

EachPartitions calls fn for each topic and its partitions in the topics set.

func (TopicsSet) EmptyTopics added in v1.6.0

func (s TopicsSet) EmptyTopics() []string

EmptyTopics returns all topics with no partitions.

func (TopicsSet) IntoList added in v1.5.0

func (s TopicsSet) IntoList() TopicsList

IntoList returns this set as a list.

func (TopicsSet) Lookup

func (s TopicsSet) Lookup(t string, p int32) bool

Lookup returns whether the topic and partition exists.

func (TopicsSet) Merge

func (s TopicsSet) Merge(other TopicsSet)

Merge merges another topic set into this one.

func (TopicsSet) Sorted

func (s TopicsSet) Sorted() TopicsList

Sorted returns this set as a list in topic-sorted order, with each topic having sorted partitions.

func (TopicsSet) Topics

func (s TopicsSet) Topics() []string

Topics returns all topics in this set in sorted order.

type TxnMarkers added in v1.6.0

type TxnMarkers struct {
	ProducerID       int64     // ProducerID is the ID to write markers for.
	ProducerEpoch    int16     // ProducerEpoch is the epoch to write markers for.
	Commit           bool      // Commit is true if we are committing, false if we are aborting.
	CoordinatorEpoch int32     // CoordinatorEpoch is the epoch of the transactional coordinator we are writing to; this is used for fencing.
	Topics           TopicsSet // Topics are topics and partitions to write markers for.
}

TxnMarkers marks the end of a partition: the producer ID / epoch doing the writing, whether this is a commit, the coordinator epoch of the broker we are writing to (for fencing), and the topics and partitions that we are writing this abort or commit for.

This is a very low level admin request and should likely be built from data in a DescribeProducers response. See KIP-664 if you are trying to use this.

type TxnMarkersPartitionResponse added in v1.6.0

type TxnMarkersPartitionResponse struct {
	NodeID     int32  // NodeID is the node that this marker was written to.
	ProducerID int64  // ProducerID corresponds to the PID in the write marker request.
	Topic      string // Topic is the topic being responded to.
	Partition  int32  // Partition is the partition being responded to.
	Err        error  // Err is non-nil if the WriteTxnMarkers request for this pid/topic/partition failed.
}

TxnMarkersPartitionResponse is a response to a topic's partition within a single marker written.

type TxnMarkersPartitionResponses added in v1.6.0

type TxnMarkersPartitionResponses map[int32]TxnMarkersPartitionResponse

TxnMarkersPartitionResponses contains per-partition responses to a WriteTxnMarkers request.

func (TxnMarkersPartitionResponses) Each added in v1.6.0

Each calls fn for each partition.

func (TxnMarkersPartitionResponses) Sorted added in v1.6.0

Sorted returns all partitions sorted by partition.

type TxnMarkersResponse added in v1.6.0

type TxnMarkersResponse struct {
	ProducerID int64                    // ProducerID corresponds to the PID in the write marker request.
	Topics     TxnMarkersTopicResponses // Topics contains the topics that markers were written for, for this ProducerID.
}

TxnMarkersResponse is a response for a single marker written.

type TxnMarkersResponses added in v1.6.0

type TxnMarkersResponses map[int64]TxnMarkersResponse

TxnMarkersResponse contains per-partition-ID responses to a WriteTxnMarkers request.

func (TxnMarkersResponses) Each added in v1.6.0

func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse))

Each calls fn for each marker response.

func (TxnMarkersResponses) EachPartition added in v1.6.0

func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse))

EachPartition calls fn for every partition in all topics in all marker responses.

func (TxnMarkersResponses) EachTopic added in v1.6.0

func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse))

EachTopic calls fn for every topic in all marker responses.

func (TxnMarkersResponses) Sorted added in v1.6.0

Sorted returns all markers sorted by producer ID.

func (TxnMarkersResponses) SortedPartitions added in v1.6.0

func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse

SortedPartitions returns all marker topic partitions sorted by producer ID then topic then partition.

func (TxnMarkersResponses) SortedTopics added in v1.6.0

func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse

SortedTopics returns all marker topics sorted by producer ID then topic.

type TxnMarkersTopicResponse added in v1.6.0

type TxnMarkersTopicResponse struct {
	ProducerID int64                        // ProducerID corresponds to the PID in the write marker request.
	Topic      string                       // Topic is the topic being responded to.
	Partitions TxnMarkersPartitionResponses // Partitions are the responses for partitions in this marker.
}

TxnMarkersTopicResponse is a response to a topic within a single marker written.

type TxnMarkersTopicResponses added in v1.6.0

type TxnMarkersTopicResponses map[string]TxnMarkersTopicResponse

TxnMarkersTopicResponses contains per-topic responses to a WriteTxnMarkers request.

func (TxnMarkersTopicResponses) Each added in v1.6.0

Each calls fn for each topic.

func (TxnMarkersTopicResponses) EachPartition added in v1.6.0

func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse))

EachPartition calls fn for every partition in all topics.

func (TxnMarkersTopicResponses) Sorted added in v1.6.0

Sorted returns all topics sorted by topic.

func (TxnMarkersTopicResponses) SortedPartitions added in v1.6.0

func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse

SortedPartitions returns all topics sorted by topic then partition.

type UpsertSCRAM added in v1.5.0

type UpsertSCRAM struct {
	User           string         // User is the username to use.
	Mechanism      ScramMechanism // Mechanism is the mechanism to use.
	Iterations     int32          // Iterations is the SCRAM iterations to use; must be between 4096 and 16384.
	Password       string         // Password is the password to salt and convert to a salted password. Requires Salt and SaltedPassword to be empty.
	Salt           []byte         // Salt must be paired with SaltedPassword and requires Password to be empty.
	SaltedPassword []byte         // SaltedPassword must be paired with Salt and requires Password to be empty.
}

UpsertSCRAM either updates or creates (inserts) a new password for a user. There are two ways to specify a password: either with the Password field directly, or by specifying both Salt and SaltedPassword. If you specify just a password, this package generates a 24 byte salt and uses pbkdf2 to create the salted password.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL