schemaregistry

package module
v0.0.0-...-7ac71ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2017 License: MIT Imports: 7 Imported by: 0

README

go-schema-registry

GO bindings for Confluent schema-registry.

This package provides bindings for the operations described in http://docs.confluent.io/3.1.2/schema-registry/docs/api.html

This is work in progress!

API operation Binding func Implemented
GET /schemas/ids/{int: id} Schema(id int) (string, error) No
GET /subjects Subjects() ([]string, error) No
GET /subjects/(string: subject)/versions SubjectVersions(subject string) ([]int, error) No
GET /subjects/(string: subject)/versions/(versionId: version) SubjectVersion(subject string, version int) (string, error) No
POST /subjects/(string: subject)/versions RegisterSubjectSchema(subject string, schema string) (int, error) Yes
POST /subjects/(string: subject) CheckSubjectSchema(subject string, schema string) (*SubjectSchema, error) Yes
POST /compatibility/subjects/(string: subject)/versions/(versionId: version) TestCompatibility(subject string, version int, schema string) (bool, error) No
PUT /config SetConfig(config *Config) (*Config, error) No
GET /config Config() (*Config, error) No
PUT /config/(string: subject) SetSubjectConfig(subject string, config *Config) (*Config, error) No
GET /config/(string: subject) SubjectConfig(subject string) (*Config, error) No

Usage:

schema := `{
  "type": "record",
  "name": "Frame",
  "fields": [
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}`
registry, err := schemaregistry.New("http://localhost:8081")
if err != nil {
        // handle err
}
ss, err := registry.CheckSubjectSchema("frames-value", schema)
if err != nil {
        // handle err
}
log.Printf("subject: %s", ss.Subject)
log.Printf("schema ID: %s", ss.ID)
log.Printf("schema version (in subject): %s", ss.Version)
log.Printf("returned schema: %s", ss.Schema)

API errors are returned as an *APIError instance, giving access to the error code and message:

_, err := registry.CheckSubjectSchema("inexistent-subject", schema)
if err != nil {
        apiErr, ok := err.(*schemaregistry.APIError)
        if ok {
                switch apiErr.Code {
                case schemaregistry.SubjectNotFound:
                        // subject not found
                case schemaregistry.SchemaNotFound:
                        // schema not found
                default:
                        // other API error, like an Internal server error
                }
        }
        // then, a non-API error, like a connectivity error, JSON encoding error, etc.
}

Also, there is a Testify mock (MockRegistry) available for testing:

testSchema := `{
  "type": "record",
  "name": "Frame",
  "fields": [
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}`
registry := &schemaregistry.MockRegistry{}
registry.On("CheckSubjectSchema", "test-frames-value", testSchema).Return(&schemaregistry.SubjectSchema{
    Subject: "test-frames-value",
    ID:      1,
    Version: 3,
    Schema:  testSchema,
}, nil)

// use the mock registry
ss, err := registry.CheckSubjectSchema("test-frames-value", testSchema)
assert.Nil(t, err)
assert.Equal(t, "test-frames-value", ss.Subject)
assert.Equal(t, 1, ss.ID)
assert.Equal(t, 3, ss.Version)
assert.Equal(t, testSchema, ss.Schema)

Documentation

Index

Constants

View Source
const Latest = 0

Latest represents the version "latest" in the operations SubjectVersion(subject, version) and TestCompatibility(subject, version, schema) of Registry

Variables

View Source
var ErrNotImplemented = errors.New("Not implemented yet :(")

ErrNotImplemented is returned by Registry operations not yet implemented

Functions

This section is empty.

Types

type APIError

type APIError struct {
	// Code is the error code
	Code ErrorCode `json:"error_code"`

	// Message is the error message
	Message string `json:"message"`
}

APIError is an error returned by the Schema Registry API

func (*APIError) Error

func (e *APIError) Error() string

type Compatibility

type Compatibility int

Compatibility is the type of compatibility supported by the registry. The schema registry server can enforce certain compatibility rules when new schemas are registered in a subject.

const (
	// None means no compatibility: A new schema can be any schema as long as it’s a valid Avro.
	None Compatibility = iota

	// Full means full compatibility: A new schema is fully compatible if it’s both backward and forward compatible
	// with the latest registered schema
	Full

	// Forward means forward compatibility: A new schema is forward compatible if the latest registered schema can
	// read data written in this schema.
	Forward

	// Backward means backward compatibility (default): A new schema is backwards compatible if it can be used to
	// read the data written in the latest registered schema.
	Backward
)

func (Compatibility) String

func (i Compatibility) String() string

type Config

type Config struct {
	// Compatibility is the compatibility level in use.
	Compatibility Compatibility
}

Config holds the configuration (global or of a subject)

type ErrorCode

type ErrorCode int
const (
	// SubjectNotFound status code (Subject not found)
	SubjectNotFound ErrorCode = 40401

	// VersionNotFound status code (Version not found)
	VersionNotFound ErrorCode = 40402

	// SchemaNotFound status code (Schema not found)
	SchemaNotFound ErrorCode = 40403

	// InvalidAvroSchema status code (Invalid Avro schema)
	InvalidAvroSchema ErrorCode = 42201

	// InvalidVersion status code (Invalid version)
	InvalidVersion ErrorCode = 42202

	// InvalidCompatibilityLevel status code (Invalid compatibility level)
	InvalidCompatibilityLevel ErrorCode = 42203

	// BackendStoreErr status code (Error in the backend data store)
	BackendStoreErr ErrorCode = 50001

	// OperationTimedOut status code (Operation timed out)
	OperationTimedOut ErrorCode = 50002

	// FwdRequestToMasterErr status code (Error while forwarding the request to the master)
	FwdRequestToMasterErr ErrorCode = 50003
)

type MockRegistry

type MockRegistry struct {
	mock.Mock
}

MockRegistry is a test double for Registry. Generated with github.com/xeger/mongoose; do not edit by hand.

func (*MockRegistry) CheckSubjectSchema

func (_m *MockRegistry) CheckSubjectSchema(subject string, schema string) (*SubjectSchema, error)

func (*MockRegistry) Config

func (_m *MockRegistry) Config() (*Config, error)

func (*MockRegistry) RegisterSubjectSchema

func (_m *MockRegistry) RegisterSubjectSchema(subject string, schema string) (int, error)

func (*MockRegistry) Schema

func (_m *MockRegistry) Schema(id int) (string, error)

func (*MockRegistry) SetConfig

func (_m *MockRegistry) SetConfig(config *Config) (*Config, error)

func (*MockRegistry) SetSubjectConfig

func (_m *MockRegistry) SetSubjectConfig(subject string, config *Config) (*Config, error)

func (*MockRegistry) SubjectConfig

func (_m *MockRegistry) SubjectConfig(subject string) (*Config, error)

func (*MockRegistry) SubjectVersion

func (_m *MockRegistry) SubjectVersion(subject string, version int) (string, error)

func (*MockRegistry) SubjectVersions

func (_m *MockRegistry) SubjectVersions(subject string) ([]int, error)

func (*MockRegistry) Subjects

func (_m *MockRegistry) Subjects() ([]string, error)

func (*MockRegistry) TestCompatibility

func (_m *MockRegistry) TestCompatibility(subject string, version int, schema string) (bool, error)

type Registry

type Registry interface {
	// Schema gets a list of registered subjects.
	Schema(id int) (string, error)

	// Subjects gets a list of registered subjects.
	Subjects() ([]string, error)

	// SubjectVersions gets a list of versions registered under the specified subject.
	SubjectVersions(subject string) ([]int, error)

	// SubjectVersion gets a specific version of the schema registered under this subject.
	SubjectVersion(subject string, version int) (string, error)

	// RegisterSubjectSchema registers a new schema under the specified subject. If successfully registered, this
	// returns the unique identifier of this schema in the registry. The returned identifier should be used to
	// retrieve this schema from the schemas resource and is different from the schema’s version which is associated
	// with the subject. If the same schema is registered under a different subject, the same identifier will be
	// returned. However, the version of the schema may be different under different subjects.
	//
	// A schema should be compatible with the previously registered schema or schemas (if there are any) as per the
	// configured compatibility level. The configured compatibility level can be obtained by issuing a
	// SubjectConfig(subject). If that returns null, then Config()
	//
	// When there are multiple instances of schema registry running in the same cluster, the schema registration
	// request will be forwarded to one of the instances designated as the master. If the master is not available,
	// the client will get an error code indicating that the forwarding has failed.
	RegisterSubjectSchema(subject string, schema string) (int, error)

	// CheckSubjectSchema checks if a schema has already been registered under the specified subject. If so, this
	// returns the schema string along with its globally unique identifier, its version under this subject and the
	// subject name.
	CheckSubjectSchema(subject string, schema string) (*SubjectSchema, error)

	// TestCompatibility tests an input schema against a particular version of a subject’s schema for compatibility.
	// Note that the compatibility level applied for the check is the configured compatibility level for the subject
	// (SubjectConfig(subject)). If this subject’s compatibility level was never changed, then the global
	// compatibility level applies (Config()).
	TestCompatibility(subject string, version int, schema string) (bool, error)

	// SetConfig updates the global compatibility level.
	//
	// When there are multiple instances of schema registry running in the same cluster, the update request will be
	// forwarded to one of the instances designated as the master. If the master is not available, the client will
	// get an error code indicating that the forwarding has failed.
	SetConfig(config *Config) (*Config, error)

	// Config gets the global compatibility level.
	Config() (*Config, error)

	// SetSubjectConfig updates the compatibility level for the specified subject.
	SetSubjectConfig(subject string, config *Config) (*Config, error)

	// SubjectConfig gets the compatibility level for a subject.
	SubjectConfig(subject string) (*Config, error)
}

Registry exposes the API operations of Schema Registry (https://github.com/confluentinc/schema-registry)

Check http://docs.confluent.io/3.1.2/schema-registry/docs/api.html for details.

func New

func New(endpoint string) (Registry, error)

New returns the default Registry implementation.

type SubjectSchema

type SubjectSchema struct {
	// Subject is the subject name. A subject refers to the name under which the schema is registered. If you are
	// using the schema registry for Kafka, then a subject refers to either a "<topic>-key" or "<topic>-value"
	// depending on whether you are registering the key schema for that topic or the value schema.
	Subject string `json:"subject"`

	// ID is the unique id of the schema in the registry.
	ID int `json:"id"`

	// Version is the version of the schema in the subject.
	Version int `json:"version"`

	// Schema is the Avro schema string
	Schema string `json:"schema"`
}

SubjectSchema holds an Avro schema string along with its globally unique identifier and its version under a specific subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL