kafka

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RequestAPIKeyProduce is the Kafka request API Key for the Produce Request.
	RequestAPIKeyProduce = 0
)

Kafka request API Keys. See https://kafka.apache.org/protocol#protocol_api_keys.

Variables

This section is empty.

Functions

func NewProduceRequestHandler

func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessage.HandlerFunc, publisher watermillmessage.Publisher, publishToTopic string) kafkaproxy.KeyHandler

NewProduceRequestHandler creates a new request key handler for the Produce Request.

func NewProxy

func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error)

NewProxy creates a new Kafka Proxy based on a given configuration.

Types

type ProxyConfig

type ProxyConfig struct {
	// Address for this proxy. Should be reachable by your clients. Most probably a domain name.
	// If not set, 0.0.0.0 will be used.
	Address            string
	BrokersMapping     []string
	DialAddressMapping []string
	ExtraConfig        []string
	MessageHandler     watermillmessage.HandlerFunc
	MessagePublisher   watermillmessage.Publisher
	PublishToTopic     string
	MessageSubscriber  watermillmessage.Subscriber
	TLS                *TLSConfig
	Debug              bool
}

ProxyConfig holds the configuration for the Kafka Proxy.

func NewProxyConfig

func NewProxyConfig(brokersMapping []string, opts ...ProxyOption) (*ProxyConfig, error)

NewProxyConfig creates a new ProxyConfig.

func (*ProxyConfig) Validate

func (c *ProxyConfig) Validate() error

Validate validates ProxyConfig.

type ProxyOption

type ProxyOption func(*ProxyConfig) error

ProxyOption represents a functional configuration for the Proxy.

func WithDebug

func WithDebug(enabled bool) ProxyOption

WithDebug enables/disables debug.

func WithDialAddressMapping

func WithDialAddressMapping(mapping []string) ProxyOption

WithDialAddressMapping configures Dial Address Mapping.

func WithExtra

func WithExtra(extra []string) ProxyOption

WithExtra configures extra parameters.

func WithMessageHandler

func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption

WithMessageHandler configures a handler that will handle all incoming messages.

func WithMessagePublisher

func WithMessagePublisher(publisher watermillmessage.Publisher, topic string) ProxyOption

WithMessagePublisher configures a publisher where the messages will be published after being handled.

func WithMessageSubscriber

func WithMessageSubscriber(subscriber watermillmessage.Subscriber) ProxyOption

WithMessageSubscriber configures a subscriber subscribed to the messages published by the configured c.MessagePublisher.

type TLSConfig

type TLSConfig struct {
	Enable             bool
	InsecureSkipVerify bool   `split_words:"true"`
	ClientCertFile     string `split_words:"true"`
	ClientKeyFile      string `split_words:"true"`
	CAChainCertFile    string `split_words:"true"`
}

TLSConfig holds configuration for TLS.

func (*TLSConfig) Config

func (c *TLSConfig) Config() (*tls.Config, error)

Config returns a *tls.Config based on current config.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL