Documentation ¶
Overview ¶
Package avro wraps linkedin/goavro to provide serialization and deserialization of avro data to go struct with tags.
Index ¶
- Constants
- Variables
- func AddNamespace(namespace, typeName string) string
- func CamelCaseToSnakeCase(name string) string
- func DefaultTypeNameEncoder(name string) string
- func GoToAvroType(name string) string
- type Codec
- type CodecRegistry
- type ConfluentSchemaRegistry
- func (c *ConfluentSchemaRegistry) DeleteSubject(subject string) (versions []int, err error)
- func (c *ConfluentSchemaRegistry) GetLatestSchema(subject string) (s Schema, err error)
- func (c *ConfluentSchemaRegistry) GetSchemaByID(id int) (string, error)
- func (c *ConfluentSchemaRegistry) GetSchemaBySubject(subject string, ver int) (s Schema, err error)
- func (c *ConfluentSchemaRegistry) IsRegistered(subject, schema string) (bool, Schema, error)
- func (c *ConfluentSchemaRegistry) RegisterNewSchema(subject, schema string) (int, error)
- func (c *ConfluentSchemaRegistry) Subjects() (subjects []string, err error)
- func (c *ConfluentSchemaRegistry) Versions(subject string) (versions []int, err error)
- type CustomUnmarshaler
- type Header
- type Marshaler
- type Schema
- type SchemaID
- type SchemaRegistry
- type TypeNameEncoder
- type TypeNamer
- type Unmarshaler
Examples ¶
Constants ¶
const MagicByte = 0x0
MagicByte define the first byte of a binary avro payload
const UnknownVersion = -1
UnknownVersion is the default value without information from the schema registry
Variables ¶
var DefaultEndianness = binary.BigEndian
DefaultEndianness is the endianness used for marshal/unmarshalling. The value must be the same between the marshaller and unmarshaller in order to work correctly. Note: It is set to BigEndian to match the implementation of the kafka-rest Confluent's project (see: https://github.com/confluentinc/kafka-rest)
var DefaultURL = "http://localhost:8081"
DefaultURL is the address where a local schema registry listens by default.
var ErrNoEncodeSchema = fmt.Errorf("no encoding schema have been initialized")
ErrNoEncodeSchema is the error returned when an encode happens without a schema provided
Functions ¶
func AddNamespace ¶
AddNamespace will contact a namespace with the given type name in avro standard way.
func CamelCaseToSnakeCase ¶
CamelCaseToSnakeCase will transform an input string written in CamelCase into snake_case
func DefaultTypeNameEncoder ¶
DefaultTypeNameEncoder combines avro type conversion and camel case to snake case
func GoToAvroType ¶
GoToAvroType transforms a Go type name to an Avro type name Note that complex types are not handled by this function as there is no counterpart
Types ¶
type Codec ¶
type Codec struct { goavro.Codec // Namespace is the namespace which will be used to encode nested type Namespace string // TypeNameEncoder will be applied on all type during encoding to transform them from Go name to avro naming convention TypeNameEncoder TypeNameEncoder }
Codec wraps the goavro.Codec type
Example ¶
package main import ( "fmt" ) type Someone struct { Name string `avro:"name"` Age int32 `avro:"age"` } func main() { val := Someone{"MyName", 3} var decoded Someone schema := `{ "type": "record", "name": "Someone", "fields": [ { "name": "name", "type": "string" }, { "name": "age", "type": "int" } ] }` codec, err := NewCodec(schema) if err != nil { panic(fmt.Sprintf("wrong schema: %s", err)) } avro, err := codec.Marshal(&val) if err != nil { panic(fmt.Sprintf("unable to serialize to avro: %s", err)) } err = codec.Unmarshal(avro, &decoded) if err != nil { panic(fmt.Sprintf("unable to deserialize from avro: %s", err)) } }
Output:
type CodecRegistry ¶
type CodecRegistry struct { Registry SchemaRegistry SchemaID SchemaID // TypeNameEncoder is the convertion logic to translate type name from go to avro TypeNameEncoder TypeNameEncoder // contains filtered or unexported fields }
CodecRegistry is an avro serializer and unserializer which is connected to the schemaregistry to dynamically discover and decode schemas
func NewCodecRegistry ¶
func NewCodecRegistry(registryURL, subject, schema string) (*CodecRegistry, error)
NewCodecRegistry configures a codec connected to the schema registry. - registryURL (required) is the complete URL of the schema registry - subject (optional) is the name of the subject under which your schema will be registered (often the kafka topic name) - schema (optional) is the schema which will be used for encoding
If an empty schema is provided it would be impossible to encode, but the decoding will auto discover the schema type ¶
Note: the CodecRegistry will take care of registering the schema and dynamic decoding
func NewCodecRegistryAndRegister ¶
func NewCodecRegistryAndRegister(registryURL string, subject string, schema string) (*CodecRegistry, error)
NewCodecRegistryAndRegister does a NewCodecRegistry() and a Register()
func NewNOOPCodecRegistry ¶
func NewNOOPCodecRegistry(subject string) *CodecRegistry
NewNOOPCodecRegistry returns a CodecRegistry that uses the NOOP schema registry
func (*CodecRegistry) Marshal ¶
func (r *CodecRegistry) Marshal(data interface{}) ([]byte, error)
Marshal implements Marshaller
func (*CodecRegistry) Register ¶
func (r *CodecRegistry) Register(rawSchema string) error
Register registers a new schema inside the Schema Registry and sets this schema as the default encode and decode schema
func (*CodecRegistry) SetTypeNameEncoder ¶
func (r *CodecRegistry) SetTypeNameEncoder(typeNameEncoder TypeNameEncoder)
SetTypeNameEncoder will set the TypeNameEncoder of the codec registry and apply it to all previously created codec
func (*CodecRegistry) Unmarshal ¶
func (r *CodecRegistry) Unmarshal(from []byte, to interface{}) error
Unmarshal implement Unmarshaller Note: the Unmarshalling of older schema can be inefficient. nolint
type ConfluentSchemaRegistry ¶
type ConfluentSchemaRegistry struct {
// contains filtered or unexported fields
}
ConfluentSchemaRegistry defines a schema registry managed by Confluent
func (*ConfluentSchemaRegistry) DeleteSubject ¶
func (c *ConfluentSchemaRegistry) DeleteSubject(subject string) (versions []int, err error)
DeleteSubject removes a list of schema under the given subject
func (*ConfluentSchemaRegistry) GetLatestSchema ¶
func (c *ConfluentSchemaRegistry) GetLatestSchema(subject string) (s Schema, err error)
GetLatestSchema returns the latest version of the subject's schema.
func (*ConfluentSchemaRegistry) GetSchemaByID ¶
func (c *ConfluentSchemaRegistry) GetSchemaByID(id int) (string, error)
GetSchemaByID returns the schema for some id. The schema registry only provides the schema itself, not the id, subject or version.
func (*ConfluentSchemaRegistry) GetSchemaBySubject ¶
func (c *ConfluentSchemaRegistry) GetSchemaBySubject(subject string, ver int) (s Schema, err error)
GetSchemaBySubject returns the schema for a particular subject and version.
func (*ConfluentSchemaRegistry) IsRegistered ¶
func (c *ConfluentSchemaRegistry) IsRegistered(subject, schema string) (bool, Schema, error)
IsRegistered tells if the given schema is registred for this subject.
func (*ConfluentSchemaRegistry) RegisterNewSchema ¶
func (c *ConfluentSchemaRegistry) RegisterNewSchema(subject, schema string) (int, error)
RegisterNewSchema registers the given schema for this subject.
func (*ConfluentSchemaRegistry) Subjects ¶
func (c *ConfluentSchemaRegistry) Subjects() (subjects []string, err error)
Subjects returns all registered subjects.
type CustomUnmarshaler ¶
CustomUnmarshaler will allow to define custom unmarshaller.
type Marshaler ¶
Marshaler is for types marshaling go types to avro
The Marshaler will understand structure field annotations like
- "-" to completely omit the field
- "omitempty" which will not use the field value if it is a zero value and replace it by its default value if there is one. WARNING: if no default value are provided, the Marshaler will return an error
Marshaler will also handle pointer on values as the default optional value pattern in avro (union with null type and null as default value).
type Schema ¶
type Schema struct { Schema string `json:"schema"` // The actual AVRO schema Subject string `json:"subject"` // Subject where the schema is registered for Version int `json:"version"` // Version within this subject ID int `json:"id"` // Registry's unique id }
The Schema type is an object produced by the schema registry.
type SchemaID ¶
type SchemaID int32
SchemaID is the type of the schema's ID return by the registry
const UnknownID SchemaID = -1
UnknownID is the value of a SchemaRegistry ID if it hasn't be registered
type SchemaRegistry ¶
type SchemaRegistry interface { Subjects() (subjects []string, err error) Versions(subject string) (versions []int, err error) RegisterNewSchema(subject, schema string) (int, error) IsRegistered(subject, schema string) (bool, Schema, error) GetSchemaByID(id int) (string, error) GetSchemaBySubject(subject string, ver int) (s Schema, err error) GetLatestSchema(subject string) (s Schema, err error) DeleteSubject(subject string) (versions []int, err error) }
SchemaRegistry is a client for the schema registry.
func NewNOOPClient ¶
func NewNOOPClient() SchemaRegistry
NewNOOPClient is a mock schema registry which can be used for testing purposes nolint
func NewSchemaRegistry ¶
func NewSchemaRegistry(baseurl string) (SchemaRegistry, error)
NewSchemaRegistry returns a new SchemaRegistry that connects to baseurl.
type TypeNameEncoder ¶
TypeNameEncoder will transform a Go type name (ex: int, string, ..) into a avro type name
type TypeNamer ¶
type TypeNamer interface {
AvroName() string
}
TypeNamer is an interface which will be used during marshalling to rename a type into its avro name
type Unmarshaler ¶
Unmarshaler is for types unmarshaling go types to avro