avro

package module
v1.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2020 License: MIT Imports: 15 Imported by: 1

README

Build Status

Avrocado

Avrocado is a convenience library to handle Avro in golang, built on top of linkedin/goavro. It is split into three parts:

  • Avro marshalling/unmarshalling using structure fields annotations inspired by the JSON standard library.
  • A confluentinc/schema-registry client.
  • A codec registry which handles marshalling/unmarshalling schemas from the schema-registry.

Getting Started

You can start using the library after installing by importing it in your go code. You need to annotate the types you want to marshal with the avro tag. Finally you will have to instantiate a codec with the corresponding Avro schema:

import "github.com/leboncoin/avrocado"

import (
        "fmt"
)

type Someone struct {
        Name string `avro:"name"`
        Age  int32  `avro:"age"`
}

func ExampleCodec() {
        val := Someone{"MyName", 3}
        var decoded Someone

        schema := `{
          "type": "record",
          "name": "Someone",
          "fields": [
            {
              "name": "name",
              "type": "string"
            }, {
              "name": "age",
              "type": "int"
            }
          ]
        }`

        codec, err := NewCodec(schema)
        if err != nil {
                panic(fmt.Sprintf("wrong schema: %s", err))
        }

        avro, err := codec.Marshal(&val)
        if err != nil {
                panic(fmt.Sprintf("unable to serialize to avro: %s", err))
        }

        err = codec.Unmarshal(avro, &decoded)
        if err != nil {
                panic(fmt.Sprintf("unable to deserialize from avro: %s", err))
        }
}

The example can also be found here.

Installing

Just run go get github.com/leboncoin/avrocado.

Examples

See the test files for examples on how to use the library.

Running tests

Just run go test at the root directory of this repository.

Documentation

Overview

Package avro wraps linkedin/goavro to provide serialization and deserialization of avro data to go struct with tags.

Index

Examples

Constants

View Source
const MagicByte = 0x0

MagicByte define the first byte of a binary avro payload

View Source
const UnknownVersion = -1

UnknownVersion is the default value without information from the schema registry

Variables

View Source
var DefaultEndianness = binary.BigEndian

DefaultEndianness is the endianness used for marshal/unmarshalling. The value must be the same between the marshaller and unmarshaller in order to work correctly. Note: It is set to BigEndian to match the implementation of the kafka-rest Confluent's project (see: https://github.com/confluentinc/kafka-rest)

View Source
var DefaultURL = "http://localhost:8081"

DefaultURL is the address where a local schema registry listens by default.

View Source
var ErrNoEncodeSchema = fmt.Errorf("no encoding schema have been initialized")

ErrNoEncodeSchema is the error returned when an encode happens without a schema provided

Functions

func AddNamespace

func AddNamespace(namespace, typeName string) string

AddNamespace will contact a namespace with the given type name in avro standard way.

func CamelCaseToSnakeCase

func CamelCaseToSnakeCase(name string) string

CamelCaseToSnakeCase will transform an input string written in CamelCase into snake_case

func DefaultTypeNameEncoder

func DefaultTypeNameEncoder(name string) string

DefaultTypeNameEncoder combines avro type conversion and camel case to snake case

func GoToAvroType

func GoToAvroType(name string) string

GoToAvroType transforms a Go type name to an Avro type name Note that complex types are not handled by this function as there is no counterpart

Types

type Codec

type Codec struct {
	goavro.Codec
	// Namespace is the namespace which will be used to encode nested type
	Namespace string
	// TypeNameEncoder will be applied on all type during encoding to transform them from Go name to avro naming convention
	TypeNameEncoder TypeNameEncoder
}

Codec wraps the goavro.Codec type

Example
package main

import (
	"fmt"
)

type Someone struct {
	Name string `avro:"name"`
	Age  int32  `avro:"age"`
}

func main() {
	val := Someone{"MyName", 3}
	var decoded Someone

	schema := `{
	  "type": "record",
	  "name": "Someone",
	  "fields": [
	    {
	      "name": "name",
	      "type": "string"
            }, {
	      "name": "age",
	      "type": "int"
	    }
	  ]
  	}`

	codec, err := NewCodec(schema)
	if err != nil {
		panic(fmt.Sprintf("wrong schema: %s", err))
	}

	avro, err := codec.Marshal(&val)
	if err != nil {
		panic(fmt.Sprintf("unable to serialize to avro: %s", err))
	}

	err = codec.Unmarshal(avro, &decoded)
	if err != nil {
		panic(fmt.Sprintf("unable to deserialize from avro: %s", err))
	}
}
Output:

func NewCodec

func NewCodec(schemaSpecification string) (*Codec, error)

NewCodec creates a codec from a schema

func (*Codec) Marshal

func (c *Codec) Marshal(st interface{}) ([]byte, error)

Marshal marshals any go type to avro

func (*Codec) Unmarshal

func (c *Codec) Unmarshal(avro []byte, output interface{}) error

Unmarshal unmarshals any go type from avro

type CodecRegistry

type CodecRegistry struct {
	Registry SchemaRegistry
	SchemaID SchemaID
	// TypeNameEncoder is the convertion logic to translate type name from go to avro
	TypeNameEncoder TypeNameEncoder
	// contains filtered or unexported fields
}

CodecRegistry is an avro serializer and unserializer which is connected to the schemaregistry to dynamically discover and decode schemas

func NewCodecRegistry

func NewCodecRegistry(registryURL, subject, schema string) (*CodecRegistry, error)

NewCodecRegistry configures a codec connected to the schema registry. - registryURL (required) is the complete URL of the schema registry - subject (optional) is the name of the subject under which your schema will be registered (often the kafka topic name) - schema (optional) is the schema which will be used for encoding

If an empty schema is provided it would be impossible to encode, but the decoding will auto discover the schema type

Note: the CodecRegistry will take care of registering the schema and dynamic decoding

func NewCodecRegistryAndRegister

func NewCodecRegistryAndRegister(registryURL string, subject string, schema string) (*CodecRegistry, error)

NewCodecRegistryAndRegister does a NewCodecRegistry() and a Register()

func NewNOOPCodecRegistry

func NewNOOPCodecRegistry(subject string) *CodecRegistry

NewNOOPCodecRegistry returns a CodecRegistry that uses the NOOP schema registry

func (*CodecRegistry) Marshal

func (r *CodecRegistry) Marshal(data interface{}) ([]byte, error)

Marshal implements Marshaller

func (*CodecRegistry) Register

func (r *CodecRegistry) Register(rawSchema string) error

Register registers a new schema inside the Schema Registry and sets this schema as the default encode and decode schema

func (*CodecRegistry) SetTypeNameEncoder

func (r *CodecRegistry) SetTypeNameEncoder(typeNameEncoder TypeNameEncoder)

SetTypeNameEncoder will set the TypeNameEncoder of the codec registry and apply it to all previously created codec

func (*CodecRegistry) Unmarshal

func (r *CodecRegistry) Unmarshal(from []byte, to interface{}) error

Unmarshal implement Unmarshaller Note: the Unmarshalling of older schema can be inefficient. nolint

type ConfluentSchemaRegistry

type ConfluentSchemaRegistry struct {
	// contains filtered or unexported fields
}

ConfluentSchemaRegistry defines a schema registry managed by Confluent

func (*ConfluentSchemaRegistry) DeleteSubject

func (c *ConfluentSchemaRegistry) DeleteSubject(subject string) (versions []int, err error)

DeleteSubject removes a list of schema under the given subject

func (*ConfluentSchemaRegistry) GetLatestSchema

func (c *ConfluentSchemaRegistry) GetLatestSchema(subject string) (s Schema, err error)

GetLatestSchema returns the latest version of the subject's schema.

func (*ConfluentSchemaRegistry) GetSchemaByID

func (c *ConfluentSchemaRegistry) GetSchemaByID(id int) (string, error)

GetSchemaByID returns the schema for some id. The schema registry only provides the schema itself, not the id, subject or version.

func (*ConfluentSchemaRegistry) GetSchemaBySubject

func (c *ConfluentSchemaRegistry) GetSchemaBySubject(subject string, ver int) (s Schema, err error)

GetSchemaBySubject returns the schema for a particular subject and version.

func (*ConfluentSchemaRegistry) IsRegistered

func (c *ConfluentSchemaRegistry) IsRegistered(subject, schema string) (bool, Schema, error)

IsRegistered tells if the given schema is registred for this subject.

func (*ConfluentSchemaRegistry) RegisterNewSchema

func (c *ConfluentSchemaRegistry) RegisterNewSchema(subject, schema string) (int, error)

RegisterNewSchema registers the given schema for this subject.

func (*ConfluentSchemaRegistry) Subjects

func (c *ConfluentSchemaRegistry) Subjects() (subjects []string, err error)

Subjects returns all registered subjects.

func (*ConfluentSchemaRegistry) Versions

func (c *ConfluentSchemaRegistry) Versions(subject string) (versions []int, err error)

Versions returns all schema version numbers registered for this subject.

type CustomUnmarshaler

type CustomUnmarshaler interface {
	UnmarshalAvro(b []byte) error
}

CustomUnmarshaler will allow to define custom unmarshaller.

type Header struct {
	MagicByte byte
	ID        SchemaID
}

Header is the first data of an AvroMessage

type Marshaler

type Marshaler interface {
	Marshal(interface{}) ([]byte, error)
}

Marshaler is for types marshaling go types to avro

The Marshaler will understand structure field annotations like

  • "-" to completely omit the field
  • "omitempty" which will not use the field value if it is a zero value and replace it by its default value if there is one. WARNING: if no default value are provided, the Marshaler will return an error

Marshaler will also handle pointer on values as the default optional value pattern in avro (union with null type and null as default value).

type Schema

type Schema struct {
	Schema  string `json:"schema"`  // The actual AVRO schema
	Subject string `json:"subject"` // Subject where the schema is registered for
	Version int    `json:"version"` // Version within this subject
	ID      int    `json:"id"`      // Registry's unique id
}

The Schema type is an object produced by the schema registry.

type SchemaID

type SchemaID int32

SchemaID is the type of the schema's ID return by the registry

const UnknownID SchemaID = -1

UnknownID is the value of a SchemaRegistry ID if it hasn't be registered

type SchemaRegistry

type SchemaRegistry interface {
	Subjects() (subjects []string, err error)
	Versions(subject string) (versions []int, err error)
	RegisterNewSchema(subject, schema string) (int, error)
	IsRegistered(subject, schema string) (bool, Schema, error)
	GetSchemaByID(id int) (string, error)
	GetSchemaBySubject(subject string, ver int) (s Schema, err error)
	GetLatestSchema(subject string) (s Schema, err error)
	DeleteSubject(subject string) (versions []int, err error)
}

SchemaRegistry is a client for the schema registry.

func NewNOOPClient

func NewNOOPClient() SchemaRegistry

NewNOOPClient is a mock schema registry which can be used for testing purposes nolint

func NewSchemaRegistry

func NewSchemaRegistry(baseurl string) (SchemaRegistry, error)

NewSchemaRegistry returns a new SchemaRegistry that connects to baseurl.

type TypeNameEncoder

type TypeNameEncoder func(name string) string

TypeNameEncoder will transform a Go type name (ex: int, string, ..) into a avro type name

type TypeNamer

type TypeNamer interface {
	AvroName() string
}

TypeNamer is an interface which will be used during marshalling to rename a type into its avro name

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]byte, interface{}) error
}

Unmarshaler is for types unmarshaling go types to avro

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL