kafka

package module
v0.0.0-...-d29119d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

codecov CircleCI

go-kafka-avro

A library provides consumer/producer to work with kafka, avro and schema registry

Installation

$ go get github.com/dangkaka/go-kafka-avro
Usage

Consumer/producer examples stay here

cd dangkaka/go-kafka-avro/examples
  • Setup kafka, schema-registry

    docker-compose up -d
    
  • Add test messages

    go run producer/main.go -n 10
    
  • Run consumer

    go run consumer/main.go
    
References

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAvroConsumer

func NewAvroConsumer(kafkaServers []string, schemaRegistryServers []string,
	topic string, groupId string, callbacks ConsumerCallbacks) (*avroConsumer, error)

avroConsumer is a basic consumer to interact with schema registry, avro and kafka

Types

type AvroEncoder

type AvroEncoder struct {
	SchemaID int
	Content  []byte
}

AvroEncoder encodes schemaId and Avro message.

func (*AvroEncoder) Encode

func (a *AvroEncoder) Encode() ([]byte, error)

Notice: the Confluent schema registry has special requirements for the Avro serialization rules, not only need to serialize the specific content, but also attach the Schema ID and Magic Byte. Ref: https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

func (*AvroEncoder) Length

func (a *AvroEncoder) Length() int

Length of schemaId and Content.

type AvroProducer

type AvroProducer struct {
	// contains filtered or unexported fields
}

func NewAvroProducer

func NewAvroProducer(kafkaServers []string, schemaRegistryServers []string) (*AvroProducer, error)

NewAvroProducer is a basic producer to interact with schema registry, avro and kafka

func (*AvroProducer) Add

func (ap *AvroProducer) Add(topic string, schema string, key []byte, value []byte) error

func (*AvroProducer) Close

func (ac *AvroProducer) Close()

func (*AvroProducer) GetSchemaId

func (ap *AvroProducer) GetSchemaId(topic string, avroCodec *goavro.Codec) (int, error)

GetSchemaId get schema id from schema-registry service

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *SchemaRegistryClient
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient

func NewCachedSchemaRegistryClientWithRetries

func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient

func (*CachedSchemaRegistryClient) CreateSubject

func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject will return and cache the id with the given codec

func (*CachedSchemaRegistryClient) DeleteSubject

func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) DeleteVersion

func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes the a specific version of a subject, should only be used in development.

func (*CachedSchemaRegistryClient) GetLatestSchema

func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchema

func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema will return and cache the codec with the given id

func (*CachedSchemaRegistryClient) GetSchemaByVersion

func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns the codec for a specific version of a subject

func (*CachedSchemaRegistryClient) GetSubjects

func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of subjects

func (*CachedSchemaRegistryClient) GetVersions

func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of all versions of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered checks if a specific codec is already registered to a subject

type ConsumerCallbacks

type ConsumerCallbacks struct {
	OnDataReceived func(msg Message)
	OnError        func(err error)
	OnNotification func(notification *cluster.Notification)
}

type Error

type Error struct {
	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

Error holds more detailed information about errors coming back from schema registry

func (*Error) Error

func (e *Error) Error() string

type Message

type Message struct {
	SchemaId  int
	Topic     string
	Partition int32
	Offset    int64
	Key       string
	Value     string
}

type SchemaRegistryClient

type SchemaRegistryClient struct {
	SchemaRegistryConnect []string
	// contains filtered or unexported fields
}

SchemaRegistryClient is a basic http client to interact with schema registry

func NewSchemaRegistryClient

func NewSchemaRegistryClient(connect []string) *SchemaRegistryClient

NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times

func NewSchemaRegistryClientWithRetries

func NewSchemaRegistryClientWithRetries(connect []string, retries int) *SchemaRegistryClient

NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses

func (*SchemaRegistryClient) CreateSubject

func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)

CreateSubject adds a schema to the subject

func (*SchemaRegistryClient) DeleteSubject

func (client *SchemaRegistryClient) DeleteSubject(subject string) error

DeleteSubject deletes a subject. It should only be used in development

func (*SchemaRegistryClient) DeleteVersion

func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error

DeleteVersion deletes a subject. It should only be used in development

func (*SchemaRegistryClient) GetLatestSchema

func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)

GetLatestSchema returns a goavro.Codec for the latest version of the subject

func (*SchemaRegistryClient) GetSchema

func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)

GetSchema returns a goavro.Codec by unique id

func (*SchemaRegistryClient) GetSchemaByVersion

func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)

GetSchemaByVersion returns a goavro.Codec for the version of the subject

func (*SchemaRegistryClient) GetSubjects

func (client *SchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of all subjects in the schema registry

func (*SchemaRegistryClient) GetVersions

func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)

GetVersions returns a list of the versions of a subject

func (*SchemaRegistryClient) IsSchemaRegistered

func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)

IsSchemaRegistered tests if the schema is registered, if so it returns the unique id of that schema

type SchemaRegistryClientInterface

type SchemaRegistryClientInterface interface {
	GetSchema(int) (*goavro.Codec, error)
	GetSubjects() ([]string, error)
	GetVersions(string) ([]int, error)
	GetSchemaByVersion(string, int) (*goavro.Codec, error)
	GetLatestSchema(string) (*goavro.Codec, error)
	CreateSubject(string, *goavro.Codec) (int, error)
	IsSchemaRegistered(string, *goavro.Codec) (int, error)
	DeleteSubject(string) error
	DeleteVersion(string, int) error
}

SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL