srclient

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: Apache-2.0 Imports: 17 Imported by: 62

README

Schema Registry Client for Go

Go Report Card Go Reference

srclient is a Golang client for Schema Registry, a software that provides a RESTful interface for developers to define standard schemas for their events, share them across the organization, and safely evolve them in a way that is backward compatible and future proof. Using this client allows developers to build Golang programs that write and read schema compatible records to/from Apache Kafka using Avro, Protobuf, and JSON Schemas while Schema Registry is used to manage the schemas used. Using this architecture, producers programs interact with Schema Registry to retrieve schemas and use it to serialize records. Then consumer programs can retrieve the same schema from Schema Registry to deserialize the records. You can read more about the benefits of using Schema Registry here.

Features

  • Simple to Use - This client provides a very high-level abstraction over the operations developers writing programs for Apache Kafka typically need. Thus, it will feel natural for them to use this client's functions. Moreover, developers don't need to handle low-level HTTP details to communicate with Schema Registry.
  • Performance - This client provides caching capabilities. This means that any data retrieved from Schema Registry can be cached locally to improve the performance of subsequent requests. This allows programs not co-located with Schema Registry to reduce the latency necessary on each request. This functionality can be disabled programmatically.

License: Apache License v2.0

Installation

Module install:

This client is a Go module, therefore you can have it simply by adding the following import to your code:

import "github.com/riferrei/srclient"

Then run a build to have this client automatically added to your go.mod file as a dependency.

Manual install:

go get -u github.com/riferrei/srclient

Testing

Unit testing can be run with the generic go test command:

go test -cover -v ./...

You can also run integration testing in your local machine given you have docker installed:

docker compose up --exit-code-from srclient-integration-test
docker compose down --rmi local

Getting Started & Examples

Acknowledgements

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompatibilityLevel added in v0.5.0

type CompatibilityLevel string
const (
	None               CompatibilityLevel = "NONE"
	Backward           CompatibilityLevel = "BACKWARD"
	BackwardTransitive CompatibilityLevel = "BACKWARD_TRANSITIVE"
	Forward            CompatibilityLevel = "FORWARD"
	ForwardTransitive  CompatibilityLevel = "FORWARD_TRANSITIVE"
	Full               CompatibilityLevel = "FULL"
	FullTransitive     CompatibilityLevel = "FULL_TRANSITIVE"
)

func (CompatibilityLevel) String added in v0.5.0

func (s CompatibilityLevel) String() string

type Error added in v0.5.0

type Error struct {
	Code    int    `json:"error_code"`
	Message string `json:"message"`
	// contains filtered or unexported fields
}

Error implements error, encodes HTTP errors from Schema Registry.

func (Error) Error added in v0.5.0

func (e Error) Error() string

type ISchemaRegistryClient

type ISchemaRegistryClient interface {
	GetGlobalCompatibilityLevel() (*CompatibilityLevel, error)
	GetCompatibilityLevel(subject string, defaultToGlobal bool) (*CompatibilityLevel, error)
	GetSubjects() ([]string, error)
	GetSubjectsIncludingDeleted() ([]string, error)
	GetSchema(schemaID int) (*Schema, error)
	GetLatestSchema(subject string) (*Schema, error)
	GetSchemaVersions(subject string) ([]int, error)
	GetSchemaByVersion(subject string, version int) (*Schema, error)
	CreateSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
	LookupSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
	ChangeSubjectCompatibilityLevel(subject string, compatibility CompatibilityLevel) (*CompatibilityLevel, error)
	DeleteSubject(subject string, permanent bool) error
	DeleteSubjectByVersion(subject string, version int, permanent bool) error
	SetCredentials(username string, password string)
	SetBearerToken(token string)
	SetTimeout(timeout time.Duration)
	CachingEnabled(value bool)
	ResetCache()
	CodecCreationEnabled(value bool)
	IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, references ...Reference) (bool, error)
}

ISchemaRegistryClient provides the definition of the operations that this Schema Registry client provides.

type MockSchemaRegistryClient

type MockSchemaRegistryClient struct {
	// contains filtered or unexported fields
}

MockSchemaRegistryClient represents an in-memory SchemaRegistryClient for testing purposes.

func CreateMockSchemaRegistryClient

func CreateMockSchemaRegistryClient(mockURL string) *MockSchemaRegistryClient

CreateMockSchemaRegistryClient initializes a MockSchemaRegistryClient

func (*MockSchemaRegistryClient) CachingEnabled

func (mck *MockSchemaRegistryClient) CachingEnabled(bool)

CachingEnabled is not implemented

func (*MockSchemaRegistryClient) ChangeSubjectCompatibilityLevel added in v0.5.0

func (mck *MockSchemaRegistryClient) ChangeSubjectCompatibilityLevel(string, CompatibilityLevel) (*CompatibilityLevel, error)

ChangeSubjectCompatibilityLevel is not implemented

func (*MockSchemaRegistryClient) CodecCreationEnabled

func (mck *MockSchemaRegistryClient) CodecCreationEnabled(bool)

CodecCreationEnabled is not implemented

func (*MockSchemaRegistryClient) CreateSchema

func (mck *MockSchemaRegistryClient) CreateSchema(subject string, schema string, schemaType SchemaType, _ ...Reference) (*Schema, error)

CreateSchema generates a new schema with the given details, references are unused

func (*MockSchemaRegistryClient) DeleteSubject

func (mck *MockSchemaRegistryClient) DeleteSubject(subject string, _ bool) error

DeleteSubject removes given subject from the cache

func (*MockSchemaRegistryClient) DeleteSubjectByVersion added in v0.5.3

func (mck *MockSchemaRegistryClient) DeleteSubjectByVersion(subject string, version int, _ bool) error

DeleteSubjectByVersion removes given subject's version from cache

func (*MockSchemaRegistryClient) GetCompatibilityLevel added in v0.5.0

func (mck *MockSchemaRegistryClient) GetCompatibilityLevel(string, bool) (*CompatibilityLevel, error)

GetCompatibilityLevel is not implemented

func (*MockSchemaRegistryClient) GetGlobalCompatibilityLevel added in v0.5.0

func (mck *MockSchemaRegistryClient) GetGlobalCompatibilityLevel() (*CompatibilityLevel, error)

GetGlobalCompatibilityLevel is not implemented

func (*MockSchemaRegistryClient) GetLatestSchema

func (mck *MockSchemaRegistryClient) GetLatestSchema(subject string) (*Schema, error)

GetLatestSchema Returns the highest ordinal version of a Schema for a given `concrete subject`

func (*MockSchemaRegistryClient) GetSchema

func (mck *MockSchemaRegistryClient) GetSchema(schemaID int) (*Schema, error)

GetSchema Returns a Schema for the given ID

func (*MockSchemaRegistryClient) GetSchemaByVersion

func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error)

GetSchemaByVersion Returns the given Schema according to the passed in subject and version number

func (*MockSchemaRegistryClient) GetSchemaVersions

func (mck *MockSchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error)

GetSchemaVersions Returns the array of versions this subject has previously registered

func (*MockSchemaRegistryClient) GetSubjects

func (mck *MockSchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects Returns all registered subjects

func (*MockSchemaRegistryClient) GetSubjectsIncludingDeleted added in v0.5.0

func (mck *MockSchemaRegistryClient) GetSubjectsIncludingDeleted() ([]string, error)

GetSubjectsIncludingDeleted is not implemented and returns an error

func (*MockSchemaRegistryClient) IsSchemaCompatible

func (mck *MockSchemaRegistryClient) IsSchemaCompatible(string, string, string, SchemaType, ...Reference) (bool, error)

IsSchemaCompatible is not implemented

func (*MockSchemaRegistryClient) LookupSchema added in v0.5.0

func (mck *MockSchemaRegistryClient) LookupSchema(string, string, SchemaType, ...Reference) (*Schema, error)

LookupSchema is not implemented

func (*MockSchemaRegistryClient) ResetCache added in v0.5.0

func (mck *MockSchemaRegistryClient) ResetCache()

ResetCache is not implemented

func (*MockSchemaRegistryClient) SetBearerToken added in v0.6.0

func (mck *MockSchemaRegistryClient) SetBearerToken(string)

SetBearerToken is not implemented

func (*MockSchemaRegistryClient) SetCredentials

func (mck *MockSchemaRegistryClient) SetCredentials(string, string)

SetCredentials is not implemented

func (*MockSchemaRegistryClient) SetSchema added in v0.6.0

func (mck *MockSchemaRegistryClient) SetSchema(id int, subject string, schema string, schemaType SchemaType, version int) (*Schema, error)

SetSchema overwrites a schema with the given id. Allows you to set a schema with a specific ID for testing purposes. Sets the ID counter to the given id if it is greater than the current counter. Version is used to set the version of the schema. If version is -1, the version will be set to the next available version.

func (*MockSchemaRegistryClient) SetTimeout

func (mck *MockSchemaRegistryClient) SetTimeout(time.Duration)

SetTimeout is not implemented

type Reference

type Reference struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

Schema references use the import statement of Protobuf and the $ref field of JSON Schema. They are defined by the name of the import or $ref and the associated subject in the registry.

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

Schema is a data structure that holds all the relevant information about schemas.

func NewSchema added in v0.5.0

func NewSchema(
	id int,
	schema string,
	schemaType SchemaType,
	version int,
	references []Reference,
	codec *goavro.Codec,
	jsonSchema *jsonschema.Schema,
) (*Schema, error)

NewSchema instantiates a new Schema struct.

func (*Schema) Codec

func (schema *Schema) Codec() *goavro.Codec

Codec ensures access to Codec Will try to initialize a new one if it hasn't been initialized before Will return nil if it can't initialize a codec from the schema

func (*Schema) ID

func (schema *Schema) ID() int

ID ensures access to ID

func (*Schema) JsonSchema added in v0.5.0

func (schema *Schema) JsonSchema() *jsonschema.Schema

JsonSchema ensures access to JsonSchema Will try to initialize a new one if it hasn't been initialized before Will return nil if it can't initialize a json schema from the schema

func (*Schema) References added in v0.3.1

func (schema *Schema) References() []Reference

References ensures access to References

func (*Schema) Schema

func (schema *Schema) Schema() string

Schema ensures access to Schema

func (*Schema) SchemaType added in v0.5.0

func (schema *Schema) SchemaType() *SchemaType

SchemaType ensures access to SchemaType

func (*Schema) Version

func (schema *Schema) Version() int

Version ensures access to Version

type SchemaRegistryClient

type SchemaRegistryClient struct {
	// contains filtered or unexported fields
}

SchemaRegistryClient allows interactions with Schema Registry over HTTP. Applications using this client can retrieve data about schemas, which in turn can be used to serialize and deserialize data.

func CreateSchemaRegistryClient

func CreateSchemaRegistryClient(schemaRegistryURL string) *SchemaRegistryClient

CreateSchemaRegistryClient creates a client that allows interactions with Schema Registry over HTTP. Applications using this client can retrieve data about schemas, which in turn can be used to serialize and deserialize records.

func CreateSchemaRegistryClientWithOptions added in v0.4.0

func CreateSchemaRegistryClientWithOptions(schemaRegistryURL string, client *http.Client, semaphoreWeight int) *SchemaRegistryClient

CreateSchemaRegistryClientWithOptions provides the ability to pass the http.Client to be used, as well as the semaphoreWeight for concurrent requests

func (*SchemaRegistryClient) CachingEnabled

func (client *SchemaRegistryClient) CachingEnabled(value bool)

CachingEnabled allows the client to cache any values that have been returned, which may speed up performance if these values rarely changes.

func (*SchemaRegistryClient) ChangeSubjectCompatibilityLevel added in v0.5.0

func (client *SchemaRegistryClient) ChangeSubjectCompatibilityLevel(subject string, compatibility CompatibilityLevel) (*CompatibilityLevel, error)

ChangeSubjectCompatibilityLevel changes the compatibility level of the subject.

func (*SchemaRegistryClient) CodecCreationEnabled

func (client *SchemaRegistryClient) CodecCreationEnabled(value bool)

CodecCreationEnabled allows the application to enable/disable the automatic creation of codec's when schemas are returned.

func (*SchemaRegistryClient) CreateSchema

func (client *SchemaRegistryClient) CreateSchema(subject string, schema string,
	schemaType SchemaType, references ...Reference) (*Schema, error)

CreateSchema creates a new schema in Schema Registry and associates with the subject provided. It returns the newly created schema with all its associated information.

func (*SchemaRegistryClient) DeleteSubject

func (client *SchemaRegistryClient) DeleteSubject(subject string, permanent bool) error

DeleteSubject deletes

func (*SchemaRegistryClient) DeleteSubjectByVersion added in v0.5.3

func (client *SchemaRegistryClient) DeleteSubjectByVersion(subject string, version int, permanent bool) error

DeleteSubjectByVersion deletes the version of the scheme

func (*SchemaRegistryClient) GetCompatibilityLevel added in v0.5.0

func (client *SchemaRegistryClient) GetCompatibilityLevel(subject string, defaultToGlobal bool) (*CompatibilityLevel, error)

GetCompatibilityLevel returns the compatibility level of the subject. If defaultToGlobal is set to true and no compatibility level is set on the subject, the global compatibility level is returned.

func (*SchemaRegistryClient) GetGlobalCompatibilityLevel added in v0.5.0

func (client *SchemaRegistryClient) GetGlobalCompatibilityLevel() (*CompatibilityLevel, error)

GetGlobalCompatibilityLevel returns the global compatibility level of the registry.

func (*SchemaRegistryClient) GetLatestSchema

func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*Schema, error)

GetLatestSchema gets the schema associated with the given subject. The schema returned contains the last version for that subject.

func (*SchemaRegistryClient) GetSchema

func (client *SchemaRegistryClient) GetSchema(schemaID int) (*Schema, error)

GetSchema gets the schema associated with the given id.

func (*SchemaRegistryClient) GetSchemaByVersion

func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error)

GetSchemaByVersion gets the schema associated with the given subject. The schema returned contains the version specified as a parameter.

func (*SchemaRegistryClient) GetSchemaVersions

func (client *SchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error)

GetSchemaVersions returns a list of versions from a given subject.

func (*SchemaRegistryClient) GetSubjects

func (client *SchemaRegistryClient) GetSubjects() ([]string, error)

GetSubjects returns a list of all subjects in the registry

func (*SchemaRegistryClient) GetSubjectsIncludingDeleted added in v0.5.0

func (client *SchemaRegistryClient) GetSubjectsIncludingDeleted() ([]string, error)

GetSubjectsIncludingDeleted returns a list of all subjects in the registry including those which have been soft deleted

func (*SchemaRegistryClient) IsSchemaCompatible

func (client *SchemaRegistryClient) IsSchemaCompatible(subject, schema, version string, schemaType SchemaType, references ...Reference) (bool, error)

IsSchemaCompatible checks if the given schema is compatible with the given subject and version valid versions are versionID and "latest"

func (*SchemaRegistryClient) LookupSchema added in v0.5.0

func (client *SchemaRegistryClient) LookupSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)

LookupSchema looks up the schema by subject and schema string. If it finds the schema it returns it with all its associated information.

func (*SchemaRegistryClient) ResetCache added in v0.5.0

func (client *SchemaRegistryClient) ResetCache()

ResetCache resets the schema caches to be able to get updated schemas.

func (*SchemaRegistryClient) SetBearerToken added in v0.6.0

func (client *SchemaRegistryClient) SetBearerToken(token string)

SetBearerToken allows users to add a Bearer Token http header with calls to Schema Registry The BearerToken will override Schema Registry credentials

func (*SchemaRegistryClient) SetCredentials

func (client *SchemaRegistryClient) SetCredentials(username string, password string)

SetCredentials allows users to set credentials to be used with Schema Registry, for scenarios when Schema Registry has authentication enabled.

func (*SchemaRegistryClient) SetTimeout

func (client *SchemaRegistryClient) SetTimeout(timeout time.Duration)

SetTimeout allows the client to be reconfigured about how much time internal HTTP requests will take until they timeout. FYI, It defaults to five seconds.

type SchemaType

type SchemaType string
const (
	Protobuf SchemaType = "PROTOBUF"
	Avro     SchemaType = "AVRO"
	Json     SchemaType = "JSON"
)

func (SchemaType) String

func (s SchemaType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL