schemaregistry

package module
v1.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2021 License: MIT Imports: 14 Imported by: 0

README

Schema Registry Client

GoDoc

This repository contains wrapper function to communicate Confluent's Kafka schema registry via REST API and schema dynamic sync directly from kafka topic.

Client

Download library using go get -u github.com/tryfix/schema-registry

Following code slice create a schema registry client

import schemaregistry "github.com/tryfix/schemaregistry"

registry, _ := schemaregistry.NewRegistry(`localhost:8089/`)

Following code line register an event com.pickme.events.test with version 1

import schemaregistry "github.com/tryfix/schemaregistry"

registry.Register(`com.pickme.events.test`, 1, func(data []byte) (v interface{}, err error)

Dynamically sync schema's from kafka schema data topic

import schemaregistry "github.com/tryfix/schemaregistry"

registry, _ := schemaregistry.NewRegistry(`localhost:8089/`,schemaregistry.WithBackgroundSync([]string{`localhost:9092`}, `__schemas`))
registry.Sync()

Message Structure

Avro encoded events are published with magic byte and schema Id added to it. Following structure shows the message format used in the library to encode the message.

+====================+====================+======================+
| Magic byte(1 byte) | Schema ID(4 bytes) | AVRO encoded message |
+====================+====================+======================+

ToDo

  • Write unit test
  • Write mock functions
  • write benchmarks
  • setup travis for automated testing

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder holds the reference to Registry and Subject which can be used to encode and decode messages

func NewEncoder

func NewEncoder(reg *Registry, subject *Subject) (*Encoder, error)

NewEncoder return the pointer to a Encoder for given Subject from the Registry

func NewGenericEncoder

func NewGenericEncoder(reg *Registry) (*Encoder, error)

NewEncoder return the pointer to a Encoder for given Subject from the Registry

func (*Encoder) Decode

func (s *Encoder) Decode(data []byte) (interface{}, error)

Decode returns the decoded go interface of avro encoded message and error if its unable to decode

func (*Encoder) Encode

func (s *Encoder) Encode(data interface{}) ([]byte, error)

Encode return a byte slice with a avro encoded message. magic byte and schema id will be appended to its beginning

╔════════════════════╤════════════════════╤══════════════════════╗
║ magic byte(1 byte) │ schema id(4 bytes) │ AVRO encoded message ║
╚════════════════════╧════════════════════╧══════════════════════╝

func (*Encoder) Schema

func (s *Encoder) Schema() string

Schema return the subject asociated with the Encoder

type GenericEncoder

type GenericEncoder struct {
	// contains filtered or unexported fields
}

Encoder holds the reference to Registry and Subject which can be used to encode and decode messages

func (*GenericEncoder) Decode

func (s *GenericEncoder) Decode(data []byte) (interface{}, error)

Decode returns the decoded go interface of avro encoded message and error if its unable to decode

func (*GenericEncoder) Encode

func (s *GenericEncoder) Encode(data interface{}) ([]byte, error)

func (*GenericEncoder) Schema

func (s *GenericEncoder) Schema() string

Schema return the subject asociated with the Encoder

type Option

type Option func(*options)

Option is a type to host NewRegistry configurations

func WithBackgroundSync

func WithBackgroundSync(bootstrapServers []string, storageTopic string) Option

WithBackgroundSync returns a Configurations to create a NewRegistry with kafka dynamic schema sync. function required slice of kafka bootstrapServers and schema storageTopic as inputs

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger returns a Configurations to create a NewRegistry with given PrefixedLogger

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry type holds schema registry details

func NewRegistry

func NewRegistry(url string, opts ...Option) (*Registry, error)

NewRegistry returns pointer to connected registry with given options or error if it's unable to connect

func (*Registry) GenericEncoder

func (r *Registry) GenericEncoder() *GenericEncoder

func (*Registry) Print added in v1.1.0

func (r *Registry) Print()

func (*Registry) Register

func (r *Registry) Register(subject string, version int, decoder jsonDecoder) error

Register registers the given subject, version and JSON value decoder to the Registry

func (*Registry) Sync

func (r *Registry) Sync() error

Sync function start the background schema sync from kafka topic

Newly Created Schemas will register in background and application does not require any restart

func (*Registry) WithLatestSchema

func (r *Registry) WithLatestSchema(subject string) *Encoder

WithLatestSchema returns the latest event version encoder registered under given subject

func (*Registry) WithSchema

func (r *Registry) WithSchema(subject string, version int) *Encoder

WithSchema return the specific encoder which registered at the initialization under the subject and version

type Subject

type Subject struct {
	Schema      string      `json:"subject"` // The actual AVRO subject
	Subject     string      `json:"subject"` // Subject where the subject is registered for
	Version     int         `json:"version"` // Version within this subject
	Id          int         `json:"id"`      // Registry's unique id
	JsonDecoder jsonDecoder `json:"json_decoder"`
}

Subject holds the Schema information of the registered subject

type Version

type Version int

Version is the type to hold default register vrsion options

const (
	//VersionLatest constant hold the flag to register the latest version of the subject
	VersionLatest Version = -1
	//VersionAll constant hold the flag to register all the versions of the subject
	VersionAll Version = -2
)

func (Version) String

func (v Version) String() string

String returns the registed version type

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL