msk

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package msk provides an MskCluster which is compatible with the github.com/aws/go-kafka-event-source/streams.Cluster interface. GKES is a non-proprietray library and using MSK is not required. This package is provided as a convenience for those who are using MSK.

Disclaimer: github.com/aws/go-kafka-event-source/msk is not maintained or endorsed by the MSK development team. It is maintained by the developers od GKES. If you have issues with GKES->MSK connectivity, or would like new GKES->MSK features, https://github.com/aws/go-kafka-event-source is the place to ask first.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultClientConfig

func DefaultClientConfig(region string) aws.Config

Returns the default AWS client config with default region of `region`. DefaultClientConfig panics on errors.

Types

type AuthType

type AuthType int
const (
	None AuthType = iota
	MutualTLS
	SaslScram
	SaslIam
	PublicMutualTLS
	PublicSaslScram
	PublicSaslIam
)

type MskClient

type MskClient interface {
	ListClusters(context.Context, *kafka.ListClustersInput, ...func(*kafka.Options)) (*kafka.ListClustersOutput, error)
	GetBootstrapBrokers(context.Context, *kafka.GetBootstrapBrokersInput, ...func(*kafka.Options)) (*kafka.GetBootstrapBrokersOutput, error)
}

type MskCluster

type MskCluster struct {
	// contains filtered or unexported fields
}

An implementation of github.com/aws/go-kafka-event-source/streams.Cluster.

func NewMskCluster

func NewMskCluster(clusterName string, authType AuthType, region string, optFns ...func(*kafka.Options)) *MskCluster

Creates a new MskCluster using DefaultClientConfig. If you're application is running in EC2/ECS Task or Lambda, this is likely the initializer you need. See Sasl IAM support if using SaslIAm/PublicSaslIam AuthType. Look here to see how to custom client SDK options, such as a custom Rery mechanism. Note: your application's IAM role will need access to the 'ListClusters' and 'GetBootstrapBrokers' calls for your MSK Cluster.

func NewMskClusterWithClientConfig

func NewMskClusterWithClientConfig(clusterName string, authType AuthType, awsConfig aws.Config, optFns ...func(*kafka.Options)) *MskCluster

Creates a new MskCluster using the specified awsConfig. If you are using STS for authentication, you will likely need to create your own AWS config. If you are running on some sort of managed container like EC2/ECS Task or Lambda, you can likely use NewMskCluster instead. Note: your application's IAM role will need access to the 'ListClusters' and 'GetBootstrapBrokers' calls for your MSK Cluster.

func (*MskCluster) Config

func (c *MskCluster) Config() (opts []kgo.Opt, err error)

Called by GKES when intiializing Kafka clients. The MskClluster will call ListClusters with a ClusterNameFilter (using cluster.clusterName) to rertieve the ARN for your specific cluster. Once the arn is retrieved, GetBootstrapBrokers will be called and the appropriate broker addresses for the specified authType will be used to seed the underlying kgo.Client

func (*MskCluster) WithClientOptions added in v1.0.2

func (c *MskCluster) WithClientOptions(opts ...kgo.Opt) *MskCluster

Used to supply additional kgo client options. Caution: Options supplied here will override any set by MskCluster. This call replaces any client options previously set. Usage:

cluster := msk.NewMskCluster("MyCluster", msk.MutualTLS, "us-east-1").WithClientOptions(
	kgo.Dialer((&tls.Dialer{Config: tlsConfig, NetDialer: &net.Dialer{KeepAlive: 20 * time.Minute}}).DialContext))

func (*MskCluster) WithScramUserPass

func (c *MskCluster) WithScramUserPass(user, pass string) *MskCluster

WithScramUserPass is used to set user/password info for SaslScram/PublicSaslScram auth types. This package does not provide for Scram credential rotation:

cluster := msk.NewMskCluster("MyCluster", msk.SaslScram, "us-east-1").WithScramUserPass("super", "secret")

func (*MskCluster) WithTlsConfig

func (c *MskCluster) WithTlsConfig(tlsConfig *tls.Config) *MskCluster

Used primarily for MutualTLS authentication. If you need any configuration beyond the certificate itself, or simply switch on TLS, you'll need to use WithClientOptions instead. See WithClientOptions for an example

cluster := msk.NewMskCluster("MyCluster", msk.MutualTLS, "us-east-1").WithTlsConfig(myMutualTlsConfig)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL