kafka

package module
v0.147.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SCRAMSHA512          = "SCRAM-SHA-512"
	SCRAMSHA256          = "SCRAM-SHA-256"
	PLAIN                = "PLAIN"
	AWSMSKIAMOAUTHBEARER = "AWS_MSK_IAM_OAUTHBEARER" //nolint:gosec // These aren't credentials.
)

Variables

This section is empty.

Functions

func NewFranzClient

func NewFranzClient(
	ctx context.Context,
	host component.Host,
	clientCfg configkafka.ClientConfig,
	logger *zap.Logger,
	opts ...kgo.Opt,
) (*kgo.Client, error)

NewFranzClient creates a franz-go client using the same commonOpts used for producer/consumer.

func NewFranzClusterAdminClient

func NewFranzClusterAdminClient(
	ctx context.Context,
	host component.Host,
	clientCfg configkafka.ClientConfig,
	logger *zap.Logger,
	opts ...kgo.Opt,
) (*kadm.Client, *kgo.Client, error)

NewFranzClusterAdminClient creates a kadm admin client from a freshly created franz client.

func NewFranzConsumerGroup

func NewFranzConsumerGroup(
	ctx context.Context,
	host component.Host,
	clientCfg configkafka.ClientConfig,
	consumerCfg configkafka.ConsumerConfig,
	topics []string,
	excludeTopics []string,
	logger *zap.Logger,
	opts ...kgo.Opt,
) (*kgo.Client, error)

NewFranzConsumerGroup creates a new Kafka consumer client using the franz-go library.

func NewFranzSyncProducer

func NewFranzSyncProducer(
	ctx context.Context,
	host component.Host,
	clientCfg configkafka.ClientConfig,
	cfg configkafka.ProducerConfig,
	timeout time.Duration,
	logger *zap.Logger,
	opts ...kgo.Opt,
) (*kgo.Client, error)

NewFranzSyncProducer creates a new Kafka client using the franz-go library.

func NewSaramaClient

func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error)

NewSaramaClient returns a new Kafka client with the given configuration.

func NewSaramaClusterAdminClient

func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientConfig) (sarama.ClusterAdmin, error)

NewSaramaClusterAdminClient returns a new Kafka cluster admin client with the given configuration.

Types

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient uses xdg-go scram to authentication conversation

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin starts the XDGSCRAMClient conversation.

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done returns true if the conversation is completed or has errored.

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step takes a string provided from a server (or just an empty string for the very first conversation step) and attempts to move the authentication conversation forward. It returns a string to be sent to the server or an error if the server message is invalid. Calling Step after a conversation completes is also an error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL