kafkaavro

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2019 License: Apache-2.0 Imports: 13 Imported by: 0

README

go-kafka-avro

A wrapper for Confluent's libraries for Apache Kafka and Schema Registry.

Installation

First install dependencies:

go get github.com/confluentinc/confluent-kafka-go github.com/landoop/schema-registry

To install use go get:

go get github.com/mycujoo/go-kafka-avro

Usage

First, you need to create cached schema registry client:

srClient, err := kafkaavro.NewCachedSchemaRegistryClient(baseurl)

For more options look at Landoop Schema Registry Client README.

Producer

Create kafka producer:

kafkaProducer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})

Then construct you can construct one or more kafkaavro producers:

producer, err := kafkaavro.NewProducer(kafkaavro.ProducerConfig{
	TopicName:            "topic",
	KeySchema:            `"string"`,
	ValueSchema:          `{"type": "record", "name": "test", "fields" : [{"name": "val", "type": "int", "default": 0}]}`,
	Producer:             kafkaProducer,
	SchemaRegistryClient: srClient,
})

Publish message using Produce method:

err = producer.Produce("key", "value", nil)

If you provide deliverChan then call will not be blocking until delivery.

Supported go versions

We support version >=1.12

Some code for cached schema registry client was based on https://github.com/dangkaka/go-kafka-avro implementation.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecodeRecordFromNative added in v0.10.0

func DecodeRecordFromNative(src interface{}, dst interface{}) error

Types

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *schemaregistry.Client
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)

func (*CachedSchemaRegistryClient) DeleteSubject

func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) GetLatestSchema

func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchemaByID

func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (*goavro.Codec, error)

GetSchemaByID will return and cache the codec with the given id

func (*CachedSchemaRegistryClient) GetSchemaBySubject

func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (*goavro.Codec, error)

GetSchemaBySubject returns the codec for a specific version of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (bool, schemaregistry.Schema, error)

IsSchemaRegistered checks if a specific codec is already registered to a subject

func (*CachedSchemaRegistryClient) RegisterNewSchema

func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)

RegisterNewSchema will return and cache the id with the given codec

func (*CachedSchemaRegistryClient) Subjects

func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)

Subjects returns a list of subjects

func (*CachedSchemaRegistryClient) Versions

func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)

Versions returns a list of all versions of a subject

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(topics []string, consumer *kafka.Consumer, schemaRegistryClient SchemaRegistryClient) (*Consumer, error)

NewConsumer is a basic consumer to interact with schema registry, avro and kafka

Example
package main

import (
	"log"

	"github.com/confluentinc/confluent-kafka-go/kafka"

	kafkaavro "github.com/mycujoo/go-kafka-avro"
)

func main() {
	kafkaConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":        "localhost:29092",
		"security.protocol":        "ssl",
		"socket.keepalive.enable":  true,
		"enable.auto.commit":       false,
		"ssl.key.location":         "/path/to/service.key",
		"ssl.certificate.location": "/path/to/service.cert",
		"ssl.ca.location":          "/path/to/ca.pem",
		"group.id":                 "some-group-id",
		"session.timeout.ms":       6000,
		"default.topic.config":     kafka.ConfigMap{"auto.offset.reset": "earliest"},
	})
	if err != nil {
		log.Fatal(err)
	}
	cachedSchemaRegistry, err := kafkaavro.NewCachedSchemaRegistryClient("http://localhost:8081")
	if err != nil {
		log.Fatal(err)
	}

	kafkaavro.NewConsumer([]string{"topic1"}, kafkaConsumer, cachedSchemaRegistry)
}
Output:

func (*Consumer) Close

func (ac *Consumer) Close()

func (*Consumer) CommitMessage

func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error)

func (*Consumer) EnsureTopics added in v0.11.0

func (ac *Consumer) EnsureTopics() error

EnsureTopics returns error if one of the consumed topics was not found on the server.

func (*Consumer) Messages

func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event)

Messages returns the ConsumerMessage channel (that contains decoded messages) and other events channel for events like kafka.PartitionEOF, kafka.Stats

func (*Consumer) SubscribeTopics added in v0.12.0

func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error

type ConsumerMessage

type ConsumerMessage struct {
	*kafka.Message
	Error error

	// Message Value parsed into maps/structs
	Parsed interface{}

	// JSON representation of the message
	Textual []byte
}

type NativeDecoder added in v0.10.0

type NativeDecoder interface {
	FromNative(interface{}) error
}

type OptionalDay added in v0.10.0

type OptionalDay struct {
	Valid bool
	Time  time.Time
}

func NewOptionalDay added in v0.10.0

func NewOptionalDay(t time.Time, valid bool) OptionalDay

func (*OptionalDay) FromNative added in v0.10.0

func (od *OptionalDay) FromNative(data interface{}) error

func (OptionalDay) MarshalJSON added in v0.10.0

func (od OptionalDay) MarshalJSON() ([]byte, error)

type OptionalInt added in v0.10.0

type OptionalInt struct {
	null.Int
}

func NewOptionalInt added in v0.10.0

func NewOptionalInt(i int64, valid bool) OptionalInt

func (*OptionalInt) FromNative added in v0.10.0

func (i *OptionalInt) FromNative(data interface{}) error

type OptionalString added in v0.10.0

type OptionalString struct {
	null.String
}

func NewOptionalString added in v0.10.0

func NewOptionalString(s string, valid bool) OptionalString

func (*OptionalString) FromNative added in v0.10.0

func (s *OptionalString) FromNative(data interface{}) error

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg ProducerConfig) (*Producer, error)

NewProducer is a producer that publishes messages to kafka topic using avro serialization format

func (*Producer) Close

func (ac *Producer) Close()

func (*Producer) Produce

func (ap *Producer) Produce(key interface{}, value interface{}, deliveryChan chan kafka.Event) error

type ProducerConfig added in v1.2.0

type ProducerConfig struct {
	// Name of the topic where messages will be produced
	TopicName string

	// Avro schema for message key
	KeySchema string

	// Avro schema for message value
	ValueSchema string

	// Low level kafka producer used to produce messages
	Producer kafkaProducer

	// Schema registry client used for messages validation and schema management
	SchemaRegistryClient SchemaRegistryClient

	// BackOffConfig is used for setting backoff strategy for retry logic
	BackOffConfig backoff.BackOff
}

type SchemaRegistryClient

type SchemaRegistryClient interface {
	GetSchemaByID(id int) (*goavro.Codec, error)
	RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL