schemaregistry

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

README

license GoDoc Build Status codecov Go Report Card

Go client for Schema Registry

A rewrite of Landdop/schema-registry for the Schema Registry v5.1.1 and above

Why ?

The Landdop/schema-registry project have been created in 2016 and a lot of things have changed since that time. This rewrite aims to be a more up-to-date client by adding the following features:

  • It keep the method signature similar to the Landoop client for an easy migration
  • It does support the contexts for all the requests
  • It support the v5.1.1 endpoints like the schema deletion and the compatibility checking
  • It keep its code simple
  • It leave the gzip managment to the go client
  • It keep everything tested
  • It propose a mock

Example

import "github.com/landoop/schema-registry"

client, _ := schemaregistry.NewClient("http://localhost:8081")
client.Subjects()

Or, to use with a Schema Registry endpoint listening on HTTPS:

import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"

    "github.com/landoop/schema-registry"
)

// Create a TLS config to use to connect to Schema Registry. This config will permit TLS connections to an endpoint
// whose TLS cert is signed by the given caFile.
caCert, err := ioutil.ReadFile("/path/to/ca/file")
if err != nil {
    panic(err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

tlsConfig :=  &tls.Config{
    RootCAs:            caCertPool,
    InsecureSkipVerify: true,
}

httpsClientTransport := &http.Transport{
  TLSClientConfig: tlsConfig,
}

httpsClient := &http.Client{
  Transport: httpsClientTransport,
}

// Create the Schema Registry client
client, _ := schemaregistry.NewClient("https://localhost:8081", UsingClient(httpsClient))
client.Subjects()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsSchemaNotFound

func IsSchemaNotFound(err error) bool

IsSchemaNotFound checks the returned error to see if it is kind of a schema not found error code.

func IsSubjectNotFound

func IsSubjectNotFound(err error) bool

IsSubjectNotFound checks the returned error to see if it is kind of a subject not found error code.

func IsVersionNotFound

func IsVersionNotFound(err error) bool

IsVersionNotFound checks the returned error to see if it's related to a version not found.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client used to interact with the registry schema REST API.

func NewClient

func NewClient(baseURL string, options ...Option) (*Client, error)

NewClient instantiate a new Client.

func (*Client) DeleteLatestSchemaVersion

func (c *Client) DeleteLatestSchemaVersion(ctx context.Context, subject string, permanent bool) (int, error)

DeleteLatestSchemaVersion remove the latest version of a schema.

See `DeleteLatestSchemaVersion` to retrieve a subject schema by a specific version.

func (*Client) DeleteSchemaVersion

func (c *Client) DeleteSchemaVersion(ctx context.Context, subject string, version int, permanent bool) (int, error)

DeleteSchemaVersion deletes a specific version of the schema registered

under this subject.

This only deletes the version and the schema ID remains intact making it still possible to decode data using the schema ID. This API is recommended to be used only in development environments or under extreme circumstances where-in, its required to delete a previously registered schema for compatibility purposes or re-register previously registered schema.

https://docs.confluent.io/current/schema-registry/docs/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)

func (*Client) DeleteSubject

func (c *Client) DeleteSubject(ctx context.Context, subject string, permanent bool) (versions []int, err error)

DeleteSubject deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environment. Returns the versions of the schema deleted under this subject.

https://docs.confluent.io/current/schema-registry/docs/api.html#delete--subjects-(string-%20subject)

func (*Client) GetConfig

func (c *Client) GetConfig(ctx context.Context, subject string) (*Config, error)

GetConfig returns the configuration (Config type) for global Schema-Registry or a specific subject. When Config returned has "compatibilityLevel" empty, it's using global settings.

https://docs.confluent.io/current/schema-registry/docs/api.html#get--config-(string-%20subject)

func (*Client) GetLatestSchema

func (c *Client) GetLatestSchema(ctx context.Context, subject string) (*Schema, error)

GetLatestSchema returns the latest version of a schema. See `GetSchemaAtVersion` to retrieve a subject schema by a specific version.

func (*Client) GetSchemaByID

func (c *Client) GetSchemaByID(ctx context.Context, subjectID int) (string, error)

GetSchemaByID returns the Avro schema string identified by the id.

https://docs.confluent.io/current/schema-registry/docs/api.html#get--schemas-ids-int-%20id

func (*Client) GetSchemaBySubjectAndVersion

func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject string, version int) (*Schema, error)

GetSchemaBySubjectAndVersion returns the schema for a particular subject and version.

https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)

func (*Client) IsRegistered

func (c *Client) IsRegistered(ctx context.Context, subject string, schema string) (bool, *Schema, error)

IsRegistered tells if the given "schema" is registered for this "subject".

https://docs.confluent.io/current/schema-registry/docs/api.html#post--subjects-(string-%20subject)

func (*Client) RegisterNewSchema

func (c *Client) RegisterNewSchema(ctx context.Context, subject string, avroSchema string) (int, error)

RegisterNewSchema registers a schema. The returned identifier should be used to retrieve this schema from the schemas resource and is different from the schema’s version which is associated with that name.

https://docs.confluent.io/current/schema-registry/docs/api.html#post--subjects-(string-%20subject)-versions

func (*Client) SchemaCompatibleWith

func (c *Client) SchemaCompatibleWith(ctx context.Context, schema string, subject string, version int) (bool, error)

SchemaCompatibleWith test input schema against a particular version of a subject's schema for compatibility.

Note that the compatibility level applied for the check is the configured compatibility level for the subject (http:get:: /config/(string: subject)). If this subject's compatibility level was never changed, then the global compatibility level applies (http:get:: /config).

https://docs.confluent.io/current/schema-registry/docs/api.html#post--compatibility-subjects-(string-%20subject)-versions-(versionId-%20version)

func (*Client) SetGlobalConfig

func (c *Client) SetGlobalConfig(ctx context.Context, config Config) (*Config, error)

func (*Client) Subjects

func (c *Client) Subjects(ctx context.Context) (subjects []string, err error)

Subjects returns a list of the available subjects(schemas).

https://docs.confluent.io/current/schema-registry/docs/api.html#subjects

func (*Client) Versions

func (c *Client) Versions(ctx context.Context, subject string) (versions []int, err error)

Versions returns all schema version numbers registered for this subject.

https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects-(string-%20subject)-versions

type ClientMock

type ClientMock struct {
	mock.Mock
}

ClientMock is a mock implementation of Client.

func (*ClientMock) DeleteLatestSchemaVersion

func (c *ClientMock) DeleteLatestSchemaVersion(ctx context.Context, subject string, permanent bool) (int, error)

DeleteLatestSchemaVersion method mock

func (*ClientMock) DeleteSchemaVersion

func (c *ClientMock) DeleteSchemaVersion(ctx context.Context, subject string, version int, permanent bool) (int, error)

DeleteSchemaVersion method mock

func (*ClientMock) DeleteSubject

func (c *ClientMock) DeleteSubject(ctx context.Context, subject string, permanent bool) (versions []int, err error)

DeleteSubject method mock

func (*ClientMock) GetConfig

func (c *ClientMock) GetConfig(ctx context.Context, subject string) (*Config, error)

GetConfig method mock

func (*ClientMock) GetLatestSchema

func (c *ClientMock) GetLatestSchema(ctx context.Context, subject string) (*Schema, error)

GetLatestSchema method mock

func (*ClientMock) GetSchemaByID

func (c *ClientMock) GetSchemaByID(ctx context.Context, subjectID int) (string, error)

GetSchemaByID method mock

func (*ClientMock) GetSchemaBySubjectAndVersion

func (c *ClientMock) GetSchemaBySubjectAndVersion(ctx context.Context, subject string, version int) (*Schema, error)

GetSchemaBySubjectAndVersion method mock

func (*ClientMock) IsRegistered

func (c *ClientMock) IsRegistered(ctx context.Context, subject string, schema string) (bool, *Schema, error)

IsRegistered method mock

func (*ClientMock) RegisterNewSchema

func (c *ClientMock) RegisterNewSchema(ctx context.Context, subject string, avroSchema string) (int, error)

RegisterNewSchema method mock

func (*ClientMock) SchemaCompatibleWith

func (c *ClientMock) SchemaCompatibleWith(ctx context.Context, schema string, subject string, version int) (bool, error)

SchemaCompatibleWith method mock

func (*ClientMock) SetGlobalConfig added in v0.1.1

func (c *ClientMock) SetGlobalConfig(ctx context.Context, config Config) (*Config, error)

SetGlobalConfig method mock.

func (*ClientMock) Subjects

func (c *ClientMock) Subjects(ctx context.Context) (subjects []string, err error)

Subjects method mock

func (*ClientMock) Versions

func (c *ClientMock) Versions(ctx context.Context, subject string) (versions []int, err error)

Versions method mock

type Config

type Config struct {
	// Compatibility mode of subject or global
	Compatibility string `json:"compatibility"`
}

Config describes a subject or globa schema-registry configuration

type Option

type Option func(*Client)

Option function used to apply modifications to the client.

func UsingClient

func UsingClient(httpClient *http.Client) Option

UsingClient modifies the underline HTTP Client that schema registry is using for contact with the backend server.

func WithBasicAuth

func WithBasicAuth(user string, password string) Option

type ResourceError

type ResourceError struct {
	ErrorCode int    `json:"error_code"`
	Method    string `json:"method,omitempty"`
	URI       string `json:"uri,omitempty"`
	Message   string `json:"message,omitempty"`
}

ResourceError is being fired from all API calls when an error code is received.

func (ResourceError) Error

func (err ResourceError) Error() string

Error is used to implement the error interface.

type Schema

type Schema struct {
	// Schema is the Avro schema string.
	Schema string `json:"schema"`
	// Subject where the schema is registered for.
	Subject string `json:"subject"`
	// Version of the returned schema.
	Version int `json:"version"`
	ID      int `json:"id,omitempty"`
}

Schema describes a schema, look `GetSchema` for more.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL