Documentation ¶
Overview ¶
Package sr provides a schema registry client and a helper type to encode values and decode data according to the schema registry wire format.
As mentioned on the Serde type, this package does not provide schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.
The client does not automatically cache schemas, instead, the Serde type is used for the actual caching of IDs to how to encode/decode the IDs. The Client type itself simply speaks http to your schema registry and returns the results.
To read more about the schema registry, see the following:
https://docs.confluent.io/platform/current/schema-registry/develop/api.html
Index ¶
- Constants
- Variables
- type Client
- func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (bool, error)
- func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult
- func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
- func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error
- func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteHow) ([]int, error)
- func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
- func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult
- func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult
- func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult
- func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int, deleted HideShowDeleted) (SubjectSchema, error)
- func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int, deleted HideShowDeleted) ([]SubjectSchema, error)
- func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error)
- func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShowDeleted) ([]SubjectSchema, error)
- func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowDeleted) ([]SubjectSchema, error)
- func (cl *Client) SetCompatibilityLevel(ctx context.Context, level CompatibilityLevel, subjects ...string) []CompatibilityResult
- func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects ...string) []ModeResult
- func (cl *Client) Subjects(ctx context.Context, deleted HideShowDeleted) ([]string, error)
- func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)
- type CompatibilityLevel
- type CompatibilityResult
- type DeleteHow
- type HideShowDeleted
- type Mode
- type ModeResult
- type Opt
- type ResponseError
- type Schema
- type SchemaReference
- type SchemaType
- type Serde
- func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error)
- func (s *Serde) Decode(b []byte, v any) error
- func (s *Serde) DecodeNew(b []byte) (any, error)
- func (s *Serde) Encode(v any) ([]byte, error)
- func (s *Serde) MustAppendEncode(b []byte, v any) []byte
- func (s *Serde) MustEncode(v any) []byte
- func (s *Serde) Register(id int, v any, opts ...SerdeOpt)
- func (s *Serde) SetDefaults(opts ...SerdeOpt)
- type SerdeOpt
- type SubjectSchema
Constants ¶
const ( // HideDeleted hides soft deleted schemas or subjects. HideDeleted = false // ShowDeleted shows soft deleted schemas or subjects. ShowDeleted = true )
const ( // SoftDelete performs a soft deletion. SoftDelete = false // HardDelete performs a hard deletion. Values must be soft deleted // before they can be hard deleted. HardDelete = true )
const GlobalSubject = ""
GlobalSubject is a constant to make API usage of requesting global subjects clearer.
Variables ¶
var ( // ErrNotRegistered is returned from Serde when attempting to encode a // value or decode an ID that has not been registered, or when using // Decode with a missing new value function. ErrNotRegistered = errors.New("registration is missing for encode/decode") // ErrBadHeader is returned from Decode when the input slice is shorter // than five bytes, or if the first byte is not the magic 0 byte. ErrBadHeader = errors.New("5 byte header for value is missing or does no have 0 magic byte") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client talks to a schema registry and contains helper functions to serialize and deserialize objects according to schemas.
func (*Client) CheckCompatibility ¶
func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (bool, error)
CheckCompatibility checks if a schema is compatible with the given version that exists. You can use -1 to check compatibility with the latest version, and -2 to check compatibility against all versions.
func (*Client) CompatibilityLevel ¶
func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult
CompatibilityLevel returns the subject level and global level compatibility of each requested subject. The global level can be requested by using either an empty subject or by specifying no subjects.
func (*Client) CreateSchema ¶
func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
CreateSchema attempts to create a schema in the given subject.
func (*Client) DeleteSchema ¶
func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error
DeleteSubjects deletes the schema at the given version. You must soft delete a schema before it can be hard deleted. You can use -1 to delete the latest version.
func (*Client) DeleteSubject ¶
DeleteSubjects deletes the subject. You must soft delete a subject before it can be hard deleted. This returns all versions that were deleted.
func (*Client) LookupSchema ¶
func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)
LookupSchema checks to see if a schema is already registered and if so, returns its ID and version in the SubjectSchema.
func (*Client) Mode ¶
func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult
Mode returns the subject and global mode of each requested subject. The global mode can be requested by using either an empty subject or by specifying no subjects.
func (*Client) ResetCompatibilityLevel ¶
func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult
ResetCompatibilityLevel deletes any subject-level compatibility level and reverts to the global default.
func (*Client) ResetMode ¶
func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult
ResetMode deletes any subject modes and reverts to the global default.
func (*Client) SchemaByVersion ¶
func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int, deleted HideShowDeleted) (SubjectSchema, error)
SchemaByVersion returns the schema for a given subject and version. You can use -1 as the version to return the latest schema.
func (*Client) SchemaReferences ¶
func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int, deleted HideShowDeleted) ([]SubjectSchema, error)
SchemaReferences returns all schemas that references the input subject-version. You can use -1 to check the latest version.
func (*Client) SchemaTextByID ¶
SchemaTextByID returns the actual text of a schema.
For example, if the schema for an ID is
"{\"type\":\"boolean\"}"
this will return
{"type":"boolean"}
func (*Client) SchemaUsagesByID ¶
func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShowDeleted) ([]SubjectSchema, error)
SchemaUsagesByID returns all usages of a given schema ID. A single schema's can be reused in many subject-versions; this function can be used to map a schema to all subject-versions that use it.
func (*Client) Schemas ¶
func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowDeleted) ([]SubjectSchema, error)
Schemas returns all schemas for the given subject.
func (*Client) SetCompatibilityLevel ¶
func (cl *Client) SetCompatibilityLevel(ctx context.Context, level CompatibilityLevel, subjects ...string) []CompatibilityResult
SetCompatibilityLevel sets the compatibility level for each requested subject. The global level can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.
func (*Client) SetMode ¶
func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects ...string) []ModeResult
SetMode sets the mode for each requested subject. The global mode can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element. Force can be used to force setting the mode even if the registry has existing schemas.
func (*Client) SupportedTypes ¶
func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)
SupportedTypes returns the schema types that are supported in the schema registry.
type CompatibilityLevel ¶
type CompatibilityLevel int
CompatibilityLevel as an enum representing config compatibility levels.
const ( CompatNone CompatibilityLevel = 1 + iota CompatBackward CompatBackwardTransitive CompatForward CompatForwardTransitive CompatFull CompatFullTransitive )
func (CompatibilityLevel) MarshalText ¶
func (l CompatibilityLevel) MarshalText() ([]byte, error)
func (CompatibilityLevel) String ¶
func (l CompatibilityLevel) String() string
func (*CompatibilityLevel) UnmarshalText ¶
func (l *CompatibilityLevel) UnmarshalText(text []byte) error
type CompatibilityResult ¶
type CompatibilityResult struct { Subject string // The subject this compatibility result is for, or empty for the global level. Level CompatibilityLevel // The subject (or global) compatibility level. Err error // The error received for getting this compatibility level. }
CompatibilityResult is the compatibility level for a subject.
type DeleteHow ¶
type DeleteHow bool
DeleteHow is a typed bool indicating how subjects or schemas should be deleted.
type HideShowDeleted ¶
type HideShowDeleted bool
HideShowDeleted is a typed bool indicating whether queries should show or hide soft deleted schemas / subjects.
type Mode ¶
type Mode int
Mode as an enum representing the "mode" of the registry or a subject.
func (Mode) MarshalText ¶
func (*Mode) UnmarshalText ¶
type ModeResult ¶
type ModeResult struct { Subject string // The subject this mode result is for, or empty for the global mode. Mode Mode // The subject (or global) mode. Err error // The error received for getting this mode. }
ModeResult is the mode for a subject.
type Opt ¶
type Opt interface {
// contains filtered or unexported methods
}
Opt is an option to configure a client.
func DialTLSConfig ¶
DialTLSConfig sets a tls.Config to use in a the default http client.
func HTTPClient ¶
HTTPClient sets the http client that the schema registry client uses, overriding the default client that speaks plaintext with a timeout of 5s.
func Normalize ¶
func Normalize() Opt
Normalize sets the client to add the "?normalize=true" query parameter when getting or creating schemas. This can help collapse duplicate schemas into one, but can also be done with a configuration parameter on the schema registry itself.
func URLs ¶
URLs sets the URLs that the client speaks to, overriding the default http://localhost:8081. This option automatically prefixes any URL that is missing an http:// or https:// prefix with http://.
type ResponseError ¶
type ResponseError struct { // Method is the requested http method. Method string `json:"-"` // URL is the full path that was requested that resulted in this error. URL string `json:"-"` ErrorCode int `json:"error_code"` Message string `json:"message"` }
ResponseError is the type returned from the schema registry for errors.
func (*ResponseError) Error ¶
func (e *ResponseError) Error() string
type Schema ¶
type Schema struct { // Schema is the actual unescaped text of a schema. Schema string `json:"schema"` // Type is the type of a schema. The default type is avro. Type SchemaType `json:"schemaType,omitempty"` // References declares other schemas this schema references. See the // docs on SchemaReference for more details. References []SchemaReference `json:"references,omitempty"` }
Schema is the object form of a schema for the HTTP API.
type SchemaReference ¶
type SchemaReference struct { Name string `json:"name"` Subject string `json:"subject"` Version int `json:"version"` }
SchemaReference is a way for a one schema to reference another. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. For more details on references, see the following link:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references https://docs.confluent.io/platform/current/schema-registry/develop/api.html
type SchemaType ¶
type SchemaType int
SchemaType as an enum representing schema types. The default schema type is avro.
const ( TypeAvro SchemaType = iota TypeProtobuf TypeJSON )
func (SchemaType) MarshalText ¶
func (t SchemaType) MarshalText() ([]byte, error)
func (SchemaType) String ¶
func (t SchemaType) String() string
func (*SchemaType) UnmarshalText ¶
func (t *SchemaType) UnmarshalText(text []byte) error
type Serde ¶
type Serde struct {
// contains filtered or unexported fields
}
Serde encodes and decodes values according to the schema registry wire format. A Serde itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.
To use a Serde for encoding, you must pre-register schema ids and values you will encode, and then you can use the encode functions.
To use a Serde for decoding, you can either pre-register schema ids and values you will consume, or you can discover the schema every time you receive an ErrNotRegistered error from decode.
func (*Serde) AppendEncode ¶
AppendEncode appends an encoded value to b according to the schema registry wire format and returns it. If EncodeFn was not used, this returns ErrNotRegistered.
func (*Serde) Decode ¶
Decode decodes b into v. If DecodeFn option was not used, this returns ErrNotRegistered.
Serde does not handle references in schemas; it is up to you to register the full decode function for any top-level ID, regardless of how many other schemas are referenced in top-level ID.
func (*Serde) DecodeNew ¶
DecodeNew is the same as Decode, but decodes into a new value rather than the input value. If DecodeFn was not used, this returns ErrNotRegistered. GenerateFn can be used to control the instantiation of a new value, otherwise this uses reflect.New(reflect.TypeOf(v)).Interface().
func (*Serde) Encode ¶
Encode encodes a value according to the schema registry wire format and returns it. If EncodeFn was not used, this returns ErrNotRegistered.
func (*Serde) MustAppendEncode ¶
MustAppendEncode returns the value of AppendEncode, panicking on error. This is a shortcut for if your encode function cannot error.
func (*Serde) MustEncode ¶
MustEncode returns the value of Encode, panicking on error. This is a shortcut for if your encode function cannot error.
func (*Serde) Register ¶
Register registers a schema ID and the value it corresponds to, as well as the encoding or decoding functions. You need to register functions depending on whether you are only encoding, only decoding, or both.
func (*Serde) SetDefaults ¶
SetDefaults sets default options to apply to every registered type. These options are always applied first, so you can override them as necessary when registering.
This can be useful if you always want to use the same encoding or decoding functions.
type SerdeOpt ¶
type SerdeOpt interface {
// contains filtered or unexported methods
}
SerdeOpt is an option to configure a Serde.
func AppendEncodeFn ¶
AppendEncodeFn allows Serde to encode a value to an existing slice. This can be more efficient than EncodeFn; this function is used if it exists.
func GenerateFn ¶
GenerateFn returns a new(Value) that can be decoded into. This function can be used to control the instantiation of a new type for DecodeNew.
func Index ¶
Index attaches a message index to a value. A single schema ID can be registered multiple times with different indices.
This option supports schemas that encode many different values from the same schema (namely, protobuf). The index into the the schema to encode a particular message is specified with `index`.
NOTE: this option must be used for protobuf schemas.
For more information, see where `message-indexes` are described in:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
type SubjectSchema ¶
type SubjectSchema struct { // Subject is the subject for this schema. This usually corresponds to // a Kafka topic, and whether this is for a key or value. For example, // "foo-key" would be the subject for the foo topic for serializing the // key field of a record. Subject string `json:"subject"` // Version is the version of this subject. Version int `json:"version"` // ID is the globally unique ID of the schema. ID int `json:"id"` Schema }
SubjectSchema pairs the subject, global identifier, and version of a schema with the schema itself.
func CommSubjectSchemas ¶
func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSchema)
CommSubjectSchemas splits l and r into three sets: what is unique in l, what is unique in r, and what is common in both. Duplicates in either map are eliminated.