aukafka

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultKeyKafkaTopicsConfig = "KAFKA_TOPICS_CONFIG"
)

Variables

This section is empty.

Functions

func NewSha256ScramClient added in v0.4.0

func NewSha256ScramClient() sarama.SCRAMClient

func NewSha512ScramClient added in v0.4.0

func NewSha512ScramClient() sarama.SCRAMClient

func ParseTopicConfigs added in v0.5.1

func ParseTopicConfigs(jsonString string) (map[string]TopicConfig, error)

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

func (*Config) ConfigItems

func (c *Config) ConfigItems() []auconfigapi.ConfigItem

func (*Config) Obtain

func (c *Config) Obtain(getter func(key string) string)

func (*Config) TopicConfigs

func (c *Config) TopicConfigs() map[string]TopicConfig

type Consumer

type Consumer[E any] struct {
	// contains filtered or unexported fields
}

func CreateConsumer

func CreateConsumer[E any](
	ctx context.Context,
	topicConfig TopicConfig,
	receiveCallback func(context.Context, *string, *E, time.Time) error,
	configPreset *sarama.Config,
) (*Consumer[E], error)

func (*Consumer[E]) Cleanup

func (c *Consumer[E]) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer[E]) Close

func (c *Consumer[E]) Close(ctx context.Context)

func (*Consumer[E]) ConsumeClaim

func (c *Consumer[E]) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer[E]) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type DefaultConfigImpl added in v0.5.1

type DefaultConfigImpl struct {
	// contains filtered or unexported fields
}

func NewDefaultConfig added in v0.6.0

func NewDefaultConfig() *DefaultConfigImpl

func (*DefaultConfigImpl) ConfigItems added in v0.5.1

func (c *DefaultConfigImpl) ConfigItems() []auconfigapi.ConfigItem

func (*DefaultConfigImpl) ObtainValues added in v0.6.0

func (c *DefaultConfigImpl) ObtainValues(getter func(string) string) error

func (*DefaultConfigImpl) TopicConfigs added in v0.5.1

func (c *DefaultConfigImpl) TopicConfigs(key string) (TopicConfig, bool)

type SyncProducer

type SyncProducer[V any] struct {
	// contains filtered or unexported fields
}

func CreateSyncProducer

func CreateSyncProducer[V any](
	_ context.Context,
	topicConfig TopicConfig,
	configPreset *sarama.Config,
) (*SyncProducer[V], error)

func (*SyncProducer[E]) Close

func (p *SyncProducer[E]) Close(ctx context.Context)

func (*SyncProducer[V]) Produce

func (p *SyncProducer[V]) Produce(
	_ context.Context,
	key *string,
	value *V,
) error

type TopicConfig

type TopicConfig struct {
	Topic         string
	Brokers       []string
	Username      string
	Password      string
	ConsumerGroup *string
	AuthType      sarama.SASLMechanism
}

type XDGSCRAMClient added in v0.4.0

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin added in v0.4.0

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done added in v0.4.0

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step added in v0.4.0

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL