kafka

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsumerType = "consumer"
)

Variables

This section is empty.

Functions

func NewKafka

func NewKafka(address string, consumer Consumer, metrics metrics.MetricsPublisher) kafka

Types

type Config

type Config struct {
	Type                  string
	Topics                []string
	ConsumerGroup         string
	Concurrency           string
	BatchSize             string
	MetricsUpdateInterval string
	BufferSize            string
	RecordType            string
	IncludeKey            string
}

type Consumer

type Consumer struct {
	Topics                []string
	Group                 string
	Endpoint              endpoint.Endpoint
	Decoder               DecodeMessageFunc
	Logger                log.Logger
	Concurrency           int
	BatchSize             int
	MetricsUpdateInterval time.Duration
	BufferSize            int
	IncludeKey            bool
}

type DecodeMessageFunc

type DecodeMessageFunc func(context.Context, *sarama.ConsumerMessage, bool) (record *models.Record, err error)

DecodeMessageFunc extracts a user-domain request object from an Kafka message object. It's designed to be used in Kafka consumers. One straightforward DecodeMessageFunc could be something that Avro decodes the message body to the concrete response type.

type Decoder

type Decoder struct {
	SchemaRegistry *schema_registry.SchemaRegistry
	CodecCache     sync.Map
}

func (*Decoder) AvroMessageToRecord

func (d *Decoder) AvroMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error)

func (*Decoder) DeserializerFor

func (d *Decoder) DeserializerFor(recordType string) DecodeMessageFunc

func (*Decoder) JsonMessageToRecord

func (d *Decoder) JsonMessageToRecord(context context.Context, msg *sarama.ConsumerMessage, includeKey bool) (*models.Record, error)

type Notification

type Notification int32
const (
	Ready Notification = iota
	Inserted
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL