easykafka

package module
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

Usage

package main

import (
	easyKafka "github.com/diazoxide/easy-kafka"
	"log"
	"os"
)

type MyMessageType struct {
	From    string `json:"from"`
	Content string `json:"content"`
}

func main() {
	logger := log.New(os.Stdout, "test", log.Ltime)
	k := easyKafka.New[MyMessageType](
		[]string{"kafka:9092"}, // KafkaUris,
		"my-prefix", // Topics Prefix
		"my-group", // Consumer Group
		12, // Partitions
		logger, // Logger
	)
	k.Produce( "my-topic", &MyMessageType{"test","test"} )
	k.Consume([]string{"my-topic"}, func(message MyMessageType) error{
		
		// Your code here
		
		return nil
    }, true)
	// ...
}

Available methods

Consume

Produce

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseProducer added in v0.0.10

type BaseProducer struct {
	LoggerContainer
	// contains filtered or unexported fields
}

BaseProducer is a wrapper around kafka.Writer

func InitBaseProducer added in v0.0.10

func InitBaseProducer(
	brokers []string,
	opts ...BaseProducerOption,
) (producer *BaseProducer, close func() error)

InitBaseProducer creates a new producer instance

func (*BaseProducer) Produce added in v0.0.10

func (p *BaseProducer) Produce(ctx context.Context, messages ...*kafka.Message) error

Produce writes messages to kafka

type BaseProducerOption added in v0.0.10

type BaseProducerOption func(kafka *BaseProducer) error

BaseProducerOption is a function that sets some option on the producer

func BaseProducerInitialPartitionsCount added in v0.0.12

func BaseProducerInitialPartitionsCount(count uint) BaseProducerOption

BaseProducerInitialPartitionsCount sets initial partitions count

func BaseProducerWithErrorLogger added in v0.0.13

func BaseProducerWithErrorLogger(logger kafka.Logger) BaseProducerOption

BaseProducerWithErrorLogger sets error logger

func BaseProducerWithLogger added in v0.0.13

func BaseProducerWithLogger(logger kafka.Logger) BaseProducerOption

BaseProducerWithLogger sets logger

func BaseProducerWithWriterConfig added in v0.0.12

func BaseProducerWithWriterConfig(writer *kafka.Writer) BaseProducerOption

BaseProducerWithWriterConfig sets writer config

type Consumer

type Consumer[T any] struct {
	LoggerContainer
	// contains filtered or unexported fields
}

Consumer is a wrapper around kafka.Reader

func InitConsumer

func InitConsumer[T any](
	brokers []string,
	topicsList []string,
	groupId string,
	opts ...ConsumerOption[T],
) (consumer *Consumer[T], close func() error)

InitConsumer creates a new consumer instance

func (*Consumer[T]) Consume

func (k *Consumer[T]) Consume(ctx context.Context, handler ConsumerHandler[T])

Consume starts consuming messages from kafka

type ConsumerErrorHandler

type ConsumerErrorHandler[T any] func(k *Consumer[T], err error)

type ConsumerHandler

type ConsumerHandler[T any] func(message *T, kafkaMessage *kafka.Message) error

ConsumerHandler is a function that handles messages

type ConsumerOption

type ConsumerOption[T any] func(kafka *Consumer[T]) error

ConsumerOption is a function that sets some option

func ConsumerConcurrency

func ConsumerConcurrency[T any](concurrency uint) ConsumerOption[T]

ConsumerConcurrency sets parallel tasks count

func ConsumerDynamicTopicsDiscovery added in v0.0.8

func ConsumerDynamicTopicsDiscovery[T any]() ConsumerOption[T]

ConsumerDynamicTopicsDiscovery enable dynamic topics discovery

func ConsumerDynamicTopicsDiscoveryInterval added in v0.0.9

func ConsumerDynamicTopicsDiscoveryInterval[T any](interval time.Duration) ConsumerOption[T]

ConsumerDynamicTopicsDiscoveryInterval sets dynamic topics discovery interval

func ConsumerInitialPartitionsCount

func ConsumerInitialPartitionsCount[T any](count uint) ConsumerOption[T]

ConsumerInitialPartitionsCount sets initial partitions count

func ConsumerTopicNamesExactMatch added in v0.0.10

func ConsumerTopicNamesExactMatch[T any]() ConsumerOption[T]

ConsumerTopicNamesExactMatch enable exact match for topic names

func ConsumerTopicNamesRegexMatch added in v0.0.10

func ConsumerTopicNamesRegexMatch[T any]() ConsumerOption[T]

ConsumerTopicNamesRegexMatch enable regex match for topic names

func ConsumerWithErrorLogger added in v0.0.13

func ConsumerWithErrorLogger[T any](logger kafka.Logger) ConsumerOption[T]

ConsumerWithErrorLogger sets error logger

func ConsumerWithLogger added in v0.0.13

func ConsumerWithLogger[T any](logger kafka.Logger) ConsumerOption[T]

ConsumerWithLogger sets logger

func ConsumerWithMaxBlockingTasks added in v0.0.8

func ConsumerWithMaxBlockingTasks[T any](count uint) ConsumerOption[T]

ConsumerWithMaxBlockingTasks sets max blocking tasks

func ConsumerWithOnFailCommitHandler added in v0.0.8

func ConsumerWithOnFailCommitHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]

ConsumerWithOnFailCommitHandler sets handler for commit error

func ConsumerWithReadMessageHandler added in v0.0.8

func ConsumerWithReadMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]

ConsumerWithReadMessageHandler sets handler for read message

func ConsumerWithReaderConfig added in v0.0.8

func ConsumerWithReaderConfig[T any](config kafka.ReaderConfig) ConsumerOption[T]

ConsumerWithReaderConfig sets reader config

func ConsumerWithTopicsListUpdatedHandler added in v0.0.8

func ConsumerWithTopicsListUpdatedHandler[T any](handler ConsumerTopicsListUpdatedHandler[T]) ConsumerOption[T]

ConsumerWithTopicsListUpdatedHandler sets handler for topics list update

func ConsumerWithWrongMessageHandler added in v0.0.8

func ConsumerWithWrongMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]

ConsumerWithWrongMessageHandler sets handler for wrong message

type ConsumerTopicsListUpdatedHandler added in v0.0.8

type ConsumerTopicsListUpdatedHandler[T any] func(k *Consumer[T], topics []string)

type ErrorHandler

type ErrorHandler[T any] func(k *Consumer[T], err error)

ErrorHandler is a function that handles errors

type LoggerContainer added in v0.0.13

type LoggerContainer struct {
	// contains filtered or unexported fields
}

type Producer

type Producer[T any] struct {
	*BaseProducer
}

Producer is a wrapper around BaseProducer

func InitProducer

func InitProducer[T any](
	brokers []string,
	opts ...BaseProducerOption,
) (producer *Producer[T], close func() error)

InitProducer initializes a new Producer instance

func (*Producer[T]) Produce

func (p *Producer[T]) Produce(ctx context.Context, topics []string, messages ...*T) error

Produce sends messages to kafka

type Stream added in v0.0.10

type Stream[T any] struct {
	LoggerContainer
	// contains filtered or unexported fields
}

Stream is a wrapper around Consumer and Producer

func InitStream added in v0.0.10

func InitStream[T any](
	brokers []string,
	topicsList []string,
	groupId string,
	opts ...StreamOption[T],
) (*Stream[T], func() error)

InitStream initializes a new Stream instance

func (*Stream[T]) Run added in v0.0.10

func (s *Stream[T]) Run(ctx context.Context, routingRules ...StreamRoutingRule[T])

Run starts the stream

type StreamOption added in v0.0.10

type StreamOption[T any] func(stream *Stream[T]) error

StreamOption is a function that modifies a Stream instance

func StreamWithConsumerOptions added in v0.0.10

func StreamWithConsumerOptions[T any](opts ...ConsumerOption[T]) StreamOption[T]

StreamWithConsumerOptions sets consumer options

func StreamWithErrorLogger added in v0.0.13

func StreamWithErrorLogger[T any](logger kafka.Logger) StreamOption[T]

StreamWithErrorLogger sets error logger

func StreamWithLogger added in v0.0.13

func StreamWithLogger[T any](logger kafka.Logger) StreamOption[T]

StreamWithLogger sets logger

func StreamWithParallelJobs added in v0.0.15

func StreamWithParallelJobs[T any](parallelJobs uint) StreamOption[T]

StreamWithParallelJobs sets parallel jobs count for stream consumer

func StreamWithProducerOptions added in v0.0.10

func StreamWithProducerOptions[T any](opts ...BaseProducerOption) StreamOption[T]

StreamWithProducerOptions sets producer options

type StreamRoutingRule added in v0.0.10

type StreamRoutingRule[T any] func(message *T, kafkaMessage *kafka.Message) (newMessage *kafka.Message, err error)

StreamRoutingRule is a function that modifies a message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL