srclient

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

README

= Schema Registry Client for Go
:toc:

:imagesdir: images/
image::Gopher_Dropping_Mic.png[Gopher, 150, 150, float="left"]

*srclient* is a Golang client for https://www.confluent.io/confluent-schema-registry/[Schema Registry], a software that provides a RESTful interface for developers to define standard schemas for their events, share them across the organization and safely evolve them in a way that is backward compatible and future proof.
Using this client allows developers to build Golang programs that write and read schema compatible records to/from https://kafka.apache.org/[Apache Kafka] using https://avro.apache.org/[Avro], https://developers.google.com/protocol-buffers[Protobuf], and https://json-schema.org[JSON Schemas] while Schema Registry is used to manage the schemas used.
Using this architecture, producers programs interact with Schema Registry to retrieve schemas and use it to serialize records, and then consumer programs can retrieve the same schema from Schema Registry to deserialize the records.
You can read more about the benefits of using Schema Registry https://www.confluent.io/blog/schemas-contracts-compatibility[here].

== Features

* *Simple to Use* - This client provides a very high-level abstraction over the operations that developers writing programs for Apache Kafka typically need.
Thus, it will feel natural for them using the functions that this client provides.
Moreover, developers don't need to handle low-level HTTP details to communicate with Schema Registry.
* *Performance* - This client provides caching capabilities.
This means that any data retrieved from Schema Registry can be cached locally to improve the performance of subsequent requests.
This allows programs that are not co-located with Schema Registry to reduce the latency necessary on each request.
This functionality can be disabled programmatically.
* *Confluent Cloud* - Go developers using https://www.confluent.io/confluent-cloud/[Confluent Cloud] can use this client to interact with the fully managed Schema Registry, which provides important features like schema enforcement that enable teams to reduce deployment issues by governing the schema changes as they evolve.

*License*: http://www.apache.org/licenses/LICENSE-2.0[Apache License v2.0]

== Installation

Module install:

This client is a Go module, therefore you can have it simply by adding the following import to your code:

[source,golang]
----
import "github.com/riferrei/srclient"
----

Then run a build to have this client automatically added to your go.mod file as a dependency.

Manual install:

[source,bash]
----
go get -u github.com/riferrei/srclient
----

== Examples

.Producer
[source,golang]
----
import (
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io/ioutil"

	"github.com/google/uuid"
	"github.com/riferrei/srclient"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

type ComplexType struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

func main() {

	topic := "myTopic"

	// 1) Create the producer as you would normally do using Confluent's Go client
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}
	defer p.Close()

	go func() {
		for event := range p.Events() {
			switch ev := event.(type) {
			case *kafka.Message:
				message := ev
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Error delivering the message '%s'\n", message.Key)
				} else {
					fmt.Printf("Message '%s' delivered successfully!\n", message.Key)
				}
			}
		}
	}()

	// 2) Fetch the latest version of the schema, or create a new one if it is the first
	schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
	schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
	if schema == nil {
		schemaBytes, _ := ioutil.ReadFile("complexType.avsc")
		schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Avro, false)
		if err != nil {
			panic(fmt.Sprintf("Error creating the schema %s", err))
		}
	}
	schemaIDBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

	// 3) Serialize the record using the schema provided by the client,
	// making sure to include the schema id as part of the record.
	newComplexType := ComplexType{ID: 1, Name: "Gopher"}
	value, _ := json.Marshal(newComplexType)
	native, _, _ := schema.Codec().NativeFromTextual(value)
	valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)

	var recordValue []byte
	recordValue = append(recordValue, byte(0))
	recordValue = append(recordValue, schemaIDBytes...)
	recordValue = append(recordValue, valueBytes...)

	key, _ := uuid.NewUUID()
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{
			Topic: &topic, Partition: kafka.PartitionAny},
		Key: []byte(key.String()), Value: recordValue}, nil)

	p.Flush(15 * 1000)

}
----

.Consumer
[source,golang]
----
import (
	"encoding/binary"
	"fmt"

	"github.com/riferrei/srclient"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	// 1) Create the consumer as you would
	// normally do using Confluent's Go client
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})
	if err != nil {
		panic(err)
	}
	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	// 2) Create a instance of the client to retrieve the schemas for each message
	schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			// 3) Recover the schema id from the message and use the
			// client to retrieve the schema from Schema Registry.
			// Then use it to deserialize the record accordingly.
			schemaID := binary.BigEndian.Uint32(msg.Value[1:5])
			schema, err := schemaRegistryClient.GetSchema(int(schemaID))
			if err != nil {
				panic(fmt.Sprintf("Error getting the schema with id '%d' %s", schemaID, err))
			}
			native, _, _ := schema.Codec().NativeFromBinary(msg.Value[5:])
			value, _ := schema.Codec().TextualFromNative(nil, native)
			fmt.Printf("Here is the message %s\n", string(value))
		} else {
			fmt.Printf("Error consuming the message: %v (%v)\n", err, msg)
		}
	}

	c.Close()
	
}
----

Both examples have been created using https://github.com/confluentinc/confluent-kafka-go[Confluent's Golang for Apache Kafka^TM^].

== Confluent Cloud

To use this client with https://www.confluent.io/confluent-cloud/[Confluent Cloud] you will need the endpoint of your managed Schema Registry and an API Key/Secret.
Both can be easily retrieved from the Confluent Cloud UI once you select an environment:

image::Getting_Endpoint_and_APIKeys.png[]

Finally, your Go program need to provide this information to the client:

[source,golang]
----
schemaRegistryClient := srclient.CreateSchemaRegistryClient("https://prefix.us-east-2.aws.confluent.cloud")
schemaRegistryClient.SetCredentials("apiKey", "apiSecret")
----

== Acknowledgements

* Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the https://www.apache.org/[Apache Software Foundation].
* The https://blog.golang.org/gopher[Go Gopher], is an artistic creation of http://reneefrench.blogspot.com/[Renee French].

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ISchemaRegistryClient

type ISchemaRegistryClient interface {
	GetSubjects() ([]string, error)
	GetSchema(schemaID int) (*Schema, error)
	GetLatestSchema(subject string, isKey bool) (*Schema, error)
	GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error)
	GetSchemaVersions(subject string, isKey bool) ([]int, error)
	GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error)
	GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error)
	GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error)
	CreateSchema(subject string, schema string, schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error)
	CreateSchemaWithArbitrarySubject(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
	DeleteSubject(subject string, permanent bool) error
	SetCredentials(username string, password string)
	SetTimeout(timeout time.Duration)
	CachingEnabled(value bool)
	CodecCreationEnabled(value bool)
	IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error)
}

ISchemaRegistryClient provides the definition of the operations that this Schema Registry client provides.

type Ids

type Ids struct {
	// contains filtered or unexported fields
}

type MockSchemaRegistryClient

type MockSchemaRegistryClient struct {
	// contains filtered or unexported fields
}

func CreateMockSchemaRegistryClient

func CreateMockSchemaRegistryClient(mockURL string) MockSchemaRegistryClient

Constructor

func (MockSchemaRegistryClient) CachingEnabled

func (mck MockSchemaRegistryClient) CachingEnabled(value bool)

func (MockSchemaRegistryClient) CodecCreationEnabled

func (mck MockSchemaRegistryClient) CodecCreationEnabled(value bool)

func (MockSchemaRegistryClient) CreateSchema

func (mck MockSchemaRegistryClient) CreateSchema(subject string, schema string, schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error)

Mock Schema creation and registration. CreateSchema behaves in two possible ways according to the scenario: 1. The schema being registered is for an already existing `concrete subject`. In that case, we increase our schemaID counter and register the schema under that subject in memory. 2. The schema being registered is for a previously unknown `concrete subject`. In that case, we set this schema as the first version of the subject and store it in memory.

Note that there is no enforcement of schema compatibility, any schema goes for all subjects.

func (MockSchemaRegistryClient) CreateSchemaWithArbitrarySubject

func (mck MockSchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)

Mock Schema creation and registration. CreateSchema behaves in two possible ways according to the scenario: 1. The schema being registered is for an already existing `concrete subject`. In that case, we increase our schemaID counter and register the schema under that subject in memory. 2. The schema being registered is for a previously unknown `concrete subject`. In that case, we set this schema as the first version of the subject and store it in memory.

Note that there is no enforcement of schema compatibility, any schema goes for all subjects.

func (MockSchemaRegistryClient) DeleteSubject

func (mck MockSchemaRegistryClient) DeleteSubject(subject string, _ bool) error

DeleteSubject removes given subject from cache

func (MockSchemaRegistryClient) GetLatestSchema

func (mck MockSchemaRegistryClient) GetLatestSchema(subject string, isKey bool) (*Schema, error)

Returns the highest ordinal version of a Schema for a given `concrete subject`

func (MockSchemaRegistryClient) GetLatestSchemaWithArbitrarySubject

func (mck MockSchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error)

Returns the highest ordinal version of a Schema for a given `concrete subject`

func (MockSchemaRegistryClient) GetSchema

func (mck MockSchemaRegistryClient) GetSchema(schemaID int) (*Schema, error)

Returns a Schema for the given ID

func (MockSchemaRegistryClient) GetSchemaByVersion

func (mck MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error)

Returns the given Schema according to the passed in subject and version number

func (MockSchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject

func (mck MockSchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error)

Returns the given Schema according to the passed in subject and version number

func (MockSchemaRegistryClient) GetSchemaVersions

func (mck MockSchemaRegistryClient) GetSchemaVersions(subject string, isKey bool) ([]int, error)

Returns the array of versions this subject has previously registered

func (MockSchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject

func (mck MockSchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error)

Returns the array of versions this subject has previously registered

func (MockSchemaRegistryClient) GetSubjects

func (mck MockSchemaRegistryClient) GetSubjects() ([]string, error)

Returns all registered subjects

func (MockSchemaRegistryClient) IsSchemaCompatible

func (mck MockSchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error)

func (MockSchemaRegistryClient) SetCredentials

func (mck MockSchemaRegistryClient) SetCredentials(username string, password string)

The classes below are implemented to accommodate ISchemaRegistryClient; However, they do nothing.

func (MockSchemaRegistryClient) SetTimeout

func (mck MockSchemaRegistryClient) SetTimeout(timeout time.Duration)

type Reference

type Reference struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

Schema references use the import statement of Protobuf and the $ref field of JSON Schema. They are defined by the name of the import or $ref and the associated subject in the registry.

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

Schema is a data structure that holds all the relevant information about schemas.

func (*Schema) Codec

func (schema *Schema) Codec() *goavro.Codec

Codec ensures access to Codec Will try to initialize a new one if it hasn't been initialized before Will return nil if it can't initialize a codec from the schema

func (*Schema) ID

func (schema *Schema) ID() int

ID ensures access to ID

func (*Schema) Schema

func (schema *Schema) Schema() string

Schema ensures access to Schema

func (*Schema) Version

func (schema *Schema) Version() int

Version ensures access to Version

type SchemaRegistryClient

type SchemaRegistryClient struct {
	// contains filtered or unexported fields
}

SchemaRegistryClient allows interactions with Schema Registry over HTTP. Applications using this client can retrieve data about schemas, which in turn can be used to serialize and deserialize data.

func CreateSchemaRegistryClient

func CreateSchemaRegistryClient(schemaRegistryURL string) *SchemaRegistryClient

CreateSchemaRegistryClient creates a client that allows interactions with Schema Registry over HTTP. Applications using this client can retrieve data about schemas, which in turn can be used to serialize and deserialize records.

func (*SchemaRegistryClient) CachingEnabled

func (client *SchemaRegistryClient) CachingEnabled(value bool)

CachingEnabled allows the client to cache any values that have been returned, which may speed up performance if these values rarely changes.

func (*SchemaRegistryClient) CodecCreationEnabled

func (client *SchemaRegistryClient) CodecCreationEnabled(value bool)

CodecCreationEnabled allows the application to enable/disable the automatic creation of codec's when schemas are returned.

func (*SchemaRegistryClient) CreateSchema

func (client *SchemaRegistryClient) CreateSchema(subject string, schema string,
	schemaType SchemaType, isKey bool, references ...Reference) (*Schema, error)

CreateSchema creates a new schema in Schema Registry and associates with the subject provided. It returns the newly created schema with all its associated information.

func (*SchemaRegistryClient) CreateSchemaWithArbitrarySubject

func (client *SchemaRegistryClient) CreateSchemaWithArbitrarySubject(subject string, schema string,
	schemaType SchemaType, references ...Reference) (*Schema, error)

CreateSchemaWithArbitrarySubject creates a new schema in Schema Registry and associates with the subject provided (without appending '-value' or '-key'). It returns the newly created schema with all its associated information.

func (*SchemaRegistryClient) DeleteSubject

func (client *SchemaRegistryClient) DeleteSubject(subject string, permanent bool) error

DeleteSubject deletes

func (*SchemaRegistryClient) GetLatestSchema

func (client *SchemaRegistryClient) GetLatestSchema(subject string, isKey bool) (*Schema, error)

GetLatestSchema gets the schema associated with the given subject. The schema returned contains the last version for that subject.

func (*SchemaRegistryClient) GetLatestSchemaWithArbitrarySubject

func (client *SchemaRegistryClient) GetLatestSchemaWithArbitrarySubject(subject string) (*Schema, error)

GetLatestSchemaWithArbitrarySubject gets the schema associated with the given subject. '-value' or '-key' is not appended to the subject The schema returned contains the last version for that subject.

func (*SchemaRegistryClient) GetSchema

func (client *SchemaRegistryClient) GetSchema(schemaID int) (*Schema, error)

GetSchema gets the schema associated with the given id.

func (*SchemaRegistryClient) GetSchemaByVersion

func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int, isKey bool) (*Schema, error)

GetSchemaByVersion gets the schema associated with the given subject. The schema returned contains the version specified as a parameter.

func (*SchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject

func (client *SchemaRegistryClient) GetSchemaByVersionWithArbitrarySubject(subject string, version int) (*Schema, error)

GetSchemaByVersion gets the schema associated with the given subject. The schema returned contains the version specified as a parameter.

func (*SchemaRegistryClient) GetSchemaVersions

func (client *SchemaRegistryClient) GetSchemaVersions(subject string, isKey bool) ([]int, error)

GetSchemaVersions returns a list of versions from a given subject.

func (*SchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject

func (client *SchemaRegistryClient) GetSchemaVersionsWithArbitrarySubject(subject string) ([]int, error)

GetSchemaVersionsWithArbitrarySubject returns a list of versions from a given subject (without appending '-key' or '-value').

func (*SchemaRegistryClient) GetSubjects

func (client *SchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of all subjects in the registry

func (*SchemaRegistryClient) IsSchemaCompatible

func (client *SchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, isKey bool) (bool, error)

IsSchemaCompatible checks if the given schema is compatible with the given subject and version valid versions are versionID and "latest"

func (*SchemaRegistryClient) IsSchemaWithArbitrarySubjectCompatible

func (client *SchemaRegistryClient) IsSchemaWithArbitrarySubjectCompatible(subject, schema, version string, schemaType SchemaType) (bool, error)

IsSchemaWithArbitrarySubjectCompatible checks if the given schema is compatible with the given subject and version (without appending '-value' or '-key') valid versions are versionID and "latest"

func (*SchemaRegistryClient) SetCredentials

func (client *SchemaRegistryClient) SetCredentials(username string, password string)

SetCredentials allows users to set credentials to be used with Schema Registry, for scenarios when Schema Registry has authentication enabled.

func (*SchemaRegistryClient) SetTimeout

func (client *SchemaRegistryClient) SetTimeout(timeout time.Duration)

SetTimeout allows the client to be reconfigured about how much time internal HTTP requests will take until they timeout. FYI, It defaults to five seconds.

type SchemaType

type SchemaType string
const (
	Protobuf SchemaType = "PROTOBUF"
	Avro     SchemaType = "AVRO"
	Json     SchemaType = "JSON"
)

func (SchemaType) String

func (s SchemaType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL