kafka

package
v0.6.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildDLQTopicForTopic added in v0.6.16

func BuildDLQTopicForTopic(topic string) string

func ConsumeFromTopic added in v0.5.79

func ConsumeFromTopic(
	ctx context.Context,
	cancel context.CancelFunc,
	groupID string,
	topic string,
	workersNum int,
	workerFunc func(ctx context.Context, cancel context.CancelFunc, r *kafka.Reader, w *kafka.Writer, wg *sync.WaitGroup, workerID int),
	readerConfig ConsumerReaderConfig,
	writerConfig ConsumerWriterConfig,
	logger logger.Logger,
)

func MarshalKafkaDataToStruct added in v0.5.62

func MarshalKafkaDataToStruct(data []byte, structToMarshal interface{}) error

func NewReader

func NewReader(cfg kafka.ReaderConfig, log logger.Logger) *kafka.Reader

func NewWriter

func NewWriter(writer kafka.Writer, log logger.Logger) *kafka.Writer

func PublishToDLQ added in v0.6.8

func PublishToDLQ(ctx context.Context, writer *kafka.Writer, dlqTopic string, message models.KafkaMessage, error string) error

func UnmarshalToKafkaMessage added in v0.5.62

func UnmarshalToKafkaMessage(data []byte) (models.KafkaMessage, error)

func WriteToTopicInBackground added in v0.5.5

func WriteToTopicInBackground(
	ctx context.Context,
	writer *kafka.Writer,
	topic string,
	key string,
	value models.KafkaMessage,
	log logger.Logger,
) error

Types

type ConsumerReaderConfig added in v0.5.79

type ConsumerReaderConfig struct {
	Brokers                []string
	MinBytes               int
	MaxBytes               int
	QueueCapacity          int
	HeartbeatInterval      time.Duration
	CommitInterval         time.Duration
	MaxAttempts            int
	DialTimeout            time.Duration
	PartitionWatchInterval time.Duration
}

type ConsumerWriterConfig added in v0.5.79

type ConsumerWriterConfig struct {
	Brokers      []string
	RequiredAcks int
	MaxAttempts  int
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

type WriterConfig

type WriterConfig struct {
	Logger logger.Logger
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL