avro

package module
v0.0.0-...-5c7fdf0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2022 License: MIT Imports: 3 Imported by: 0

README

avro

Build Status codecov Go Report Card

The purpose of this package is to facilitate use of AVRO with go strong typing.

Features

github.com/khezen/avro

GoDoc

github.com/khezen/avro/sqlavro

GoDoc

github.com/khezen/avro/redshiftavro

GoDoc

What is AVRO

Apache AVRO is a data serialization system which relies on JSON schemas.

It provides:

  • Rich data structures
  • A compact, fast, binary data format
  • A container file, to store persistent data
  • Remote procedure call (RPC)

AVRO binary encoded data comes together with its schema and therefore is fully self-describing.

When AVRO data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

When AVRO data is stored in a file, its schema is stored with it, so that files may be processed later by any program. If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.

Examples

Schema Marshal/Unmarshal
package main

import (
  "encoding/json"
  "fmt"

  "github.com/khezen/avro"
)

func main() {
  schemaBytes := []byte(
    `{
      "type": "record",
      "namespace": "test",
      "name": "LongList",
      "aliases": [
        "LinkedLongs"
      ],
      "doc": "linked list of 64 bits integers",
      "fields": [
        {
          "name": "value",
          "type": "long"
        },
        {
          "name": "next",
          "type": [
            "null",
            "LongList"
          ]
        }
      ]
    }`,
  )

  // Unmarshal JSON  bytes to Schema interface
  var anySchema avro.AnySchema
  err := json.Unmarshal(schemaBytes, &anySchema)
  if err != nil {
    panic(err)
  }
  schema := anySchema.Schema()  
  // Marshal Schema interface to JSON bytes
  schemaBytes, err = json.Marshal(schema)
  if err != nil {
    panic(err)
  }
  fmt.Println(string(schemaBytes))
}
{
    "type": "record",
    "namespace": "test",
    "name": "LongList",
    "aliases": [
        "LinkedLongs"
    ],
    "doc": "linked list of 64 bits integers",
    "fields": [
        {
            "name": "value",
            "type": "long"
        },
        {
            "name": "next",
            "type": [
                "null",
                "LongList"
            ]
        }
    ]
}
Convert SQL Table to AVRO Schema
package main
import (
  "database/sql"
  "encoding/json"
  "fmt"

  "github.com/khezen/avro/sqlavro"
)

func main() {
  db, err := sql.Open("mysql", "root@/blog")
  if err != nil {
    panic(err)
  }
  defer db.Close()
  _, err = db.Exec(
    `CREATE TABLE posts(
      ID INT NOT NULL,
      title VARCHAR(128) NOT NULL,
      body LONGBLOB NOT NULL,
      content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
      post_date DATETIME NOT NULL,
      update_date DATETIME,
      reading_time_minutes DECIMAL(3,1),
      PRIMARY KEY(ID)
    )`,
  )
  if err != nil {
    panic(err)
  }
  schemas, err := sqlavro.SQLDatabase2AVRO(db, "blog")
  if err != nil {
    panic(err)
  }
  schemasBytes, err := json.Marshal(schemas)
  if err != nil {
    panic(err)
  }
  fmt.Println(string(schemasBytes))
}
[
    {
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
    }
]
Query records from SQL into AVRO or CSV binary
package main

import (
	"database/sql"
	"fmt"
	"io/ioutil"
	"time"

	"github.com/khezen/avro"
	"github.com/khezen/avro/sqlavro"
)

func main() {
	db, err := sql.Open("mysql", "root@/blog")
	if err != nil {
		panic(err)
	}
	defer db.Close()
	_, err = db.Exec(
		`CREATE TABLE posts(
			ID INT NOT NULL,
			title VARCHAR(128) NOT NULL,
			body LONGBLOB NOT NULL,
			content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
			post_date DATETIME NOT NULL,
			update_date DATETIME,
			reading_time_minutes DECIMAL(3,1),
			PRIMARY KEY(ID)
		)`,
	)
	if err != nil {
		panic(err)
	}
	_, err = db.Exec(
		// statement
		`INSERT INTO posts(ID,title,body,content_type,post_date,update_date,reading_time_minutes)
		 VALUES (?,?,?,?,?,?,?)`,
		// values
		42,
		"lorem ispum",
		[]byte(`Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.`),
		"text/markdown; charset=UTF-8",
		"2009-04-10 00:00:00",
		"2009-04-10 00:00:00",
		"4.2",
	)
	if err != nil {
		panic(err)
	}
	schema, err := sqlavro.SQLTable2AVRO(db, "blog", "posts")
	if err != nil {
		panic(err)
	}
	limit := 1000
	order := avro.Ascending
	from, err := time.Parse("2006-02-01 15:04:05", "2009-04-10 00:00:00")
	if err != nil {
		panic(err)
	}
	avroBytes, updatedCriteria, err := sqlavro.Query(sqlavro.QueryConfig{
		DB:     db,
		DBName: "blog",
		Schema: schema,
		Limit:  limit,
		Criteria: []sqlavro.Criterion{
			*sqlavro.NewCriterionDateTime("post_date", &from, order),
		},
		Output: "avro",
	})
	if err != nil {
		panic(err)
	}
	err = ioutil.WriteFile("/tmp/blog_posts.avro", avroBytes, 0644)
	if err != nil {
		panic(err)
	}
	fmt.Println(updatedCriteria)
}
Notes
  • When record fields contains aliases, the first alias is used in the query instead of the field name.

Types

Avro Go     SQL
null nil NULL
bytes []byte BLOB,MEDIUMBLOB,LONGBLOB
fixed []byte       CHAR,NCHAR
string,enum string VARCHAR, NVARCHAR,TEXT,TINYTEXT,MEDIUMTEXT,LONGTEXT,ENUM,SET
float float32 FLOAT
double float64 DOUBLE
long int64 BIGINT
int int32   TINYINT,SMALLINT,INT,YEAR
decimal *big.Rat DECIMAL
time int32 TIME
timestamp int32 TIMESTAMP,DATETIME
date time.Time DATE
array []interface{} N/A
map,record map[string]interface{} N/A
union see below    any type nullable

Because of encoding rules for Avro unions, when an union's value is null, a simple Go nil is returned. However when an union's value is non-nil, a Go map[string]interface{} with a single key is returned for the union. The map's single key is the Avro type name and its value is the datum's value.

Produce Redshift create statement from AVRO schema
package main

import (
	"encoding/json"
	"fmt"

	"github.com/khezen/avro"
	"github.com/khezen/avro/redshiftavro"
)

func main() {
	schemaBytes := []byte(`
	{
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
	}`)
	var anySchema avro.AnySchema
	err := json.Unmarshal(schemaBytes, &anySchema)
	if err != nil {
		panic(err)
	}
	schema := anySchema.Schema().(*avro.RecordSchema)
	cfg := redshiftavro.CreateConfig{
		Schema:      *schema,
		SortKeys:    []string{"post_date", "title"},
		IfNotExists: true,
	}
	statement, err := redshiftavro.CreateTableStatement(cfg)
	if err != nil {
		panic(err)
	}
	fmt.Println(statement)
}
CREATE TABLE IF NOT EXISTS posts(
	ID INTEGER ENCODE LZO NOT NULL,
	title VARCHAR(65535) ENCODE RAW NOT NULL,
	body VARCHAR(65535) ENCODE ZSTD NOT NULL,
	content_type VARCHAR(65535) ENCODE ZSTD NULL,
	post_date TIMESTAMP WITHOUT TIME ZONE ENCODE RAW NOT NULL,
	update_date TIMESTAMP WITHOUT TIME ZONE ENCODE LZO NULL,
	reading_time_minutes DECIMAL(3,1) ENCODE RAW NULL
)
SORTKEY(
	post_date,
	title
)

Issues

If you have any problems or questions, please ask for help through a GitHub issue.

Contributions

Help is always welcome! For example, documentation (like the text you are reading now) can always use improvement. There's always code that can be improved. If you ever see something you think should be fixed, you should own it. If you have no idea what to start on, you can browse the issues labeled with help wanted.

As a potential contributor, your changes and ideas are welcome at any hour of the day or night, weekdays, weekends, and holidays. Please do not ever hesitate to ask a question or send a pull request.

Code of conduct.

Documentation

Index

Examples

Constants

View Source
const (
	// CompressionNull - The "null" codec simply passes through data uncompressed.
	CompressionNull = "null"
	// CompressionDeflate - The "deflate" codec writes the data block using the deflate algorithm as specified in RFC 1951,
	// and typically implemented using the zlib library.
	// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
	CompressionDeflate = "deflate"
	// CompressionSnappy - The "snappy" codec uses Google's Snappy compression library.
	// Each compressed block is followed by the 4-byte, big-endian CRC32 checksum of the uncompressed data in the block.
	CompressionSnappy = "snappy"
)

Variables

View Source
var (
	// ErrUnsupportedType - Avro doesn't support the given type
	ErrUnsupportedType = errors.New("ErrUnsupportedType - AVRO doesn't support the given type")
	// ErrInvalidSchema - Avro doesn't support the given type
	ErrInvalidSchema = errors.New("ErrInvalidSchema - Given schema is not AVRO")
	// ErrUnsupportedCompression - avro doesn't supprot this compression
	ErrUnsupportedCompression = errors.New("ErrUnsupportedCompression")
)

Functions

This section is empty.

Types

type AnySchema

type AnySchema struct {
	// contains filtered or unexported fields
}

AnySchema -

func (*AnySchema) Schema

func (as *AnySchema) Schema() Schema

Schema returns the unmarshalled schema

func (*AnySchema) UnmarshalJSON

func (as *AnySchema) UnmarshalJSON(bytes []byte) error

UnmarshalJSON -

type ArraySchema

type ArraySchema struct {
	Type  Type   `json:"type"`
	Items Schema `json:"items"`
}

ArraySchema -

func (*ArraySchema) TypeName

func (t *ArraySchema) TypeName() Type

TypeName -

type DerivedPrimitiveSchema

type DerivedPrimitiveSchema struct {
	Type          Type        `json:"type"`
	Documentation string      `json:"doc,omitempty"`
	LogicalType   LogicalType `json:"logicalType"`
	Precision     *int        `json:"precision,omitempty"`
	Scale         *int        `json:"scale,omitempty"`
}

DerivedPrimitiveSchema -

func (*DerivedPrimitiveSchema) TypeName

func (t *DerivedPrimitiveSchema) TypeName() Type

TypeName -

type EnumSchema

type EnumSchema struct {
	Type          Type     `json:"type"`
	Name          string   `json:"name"`
	Namespace     string   `json:"namespace,omitempty"`
	Aliases       []string `json:"aliases,omitempty"`
	Documentation string   `json:"doc,omitempty"`
	Symbols       []string `json:"symbols"`
}

EnumSchema -

func (*EnumSchema) TypeName

func (t *EnumSchema) TypeName() Type

TypeName -

type FixedSchema

type FixedSchema struct {
	Type          Type        `json:"type"`
	LogicalType   LogicalType `json:"logicalType,omitempty"`
	Name          string      `json:"name"`
	Namespace     string      `json:"namespace,omitempty"`
	Aliases       []string    `json:"aliases,omitempty"`
	Documentation string      `json:"doc,omitempty"`
	Size          int         `json:"size"`
}

FixedSchema -

func (*FixedSchema) TypeName

func (t *FixedSchema) TypeName() Type

TypeName -

type LogicalType

type LogicalType Type

LogicalType decorates primitive and complex types to represent a derived type

const (
	// LogicalTypeDecimal -
	LogicalTypeDecimal LogicalType = "decimal"
	// LogicalTypeDate -
	LogicalTypeDate LogicalType = "date"
	// LogicalTypeTime -
	LogicalTypeTime LogicalType = "time"
	// LogicalTypeTimestamp -
	LogicalTypeTimestamp LogicalType = "timestamp"
	// LogialTypeDuration -
	LogialTypeDuration LogicalType = "duration"
)

type MapSchema

type MapSchema struct {
	Type  Type   `json:"type"`
	Value Schema `json:"values"`
}

MapSchema -

func (*MapSchema) TypeName

func (t *MapSchema) TypeName() Type

TypeName -

type Order

type Order string

Order - specifies how this field impacts sort ordering of this record (optional). Valid values are "ascending" (the default), "descending", or "ignore".

const (
	// Ascending -
	Ascending Order = "ascending"
	// Descending -
	Descending Order = "descending"
	// Ignore -
	Ignore Order = "ignore"
)

type RecordFieldSchema

type RecordFieldSchema struct {
	Name          string           `json:"name"`
	Aliases       []string         `json:"aliases,omitempty"`
	Documentation string           `json:"doc,omitempty"`
	Type          Schema           `json:"type"`
	Default       *json.RawMessage `json:"default,omitempty"`
	Order         Order            `json:"order,omitempty"`
}

RecordFieldSchema -

type RecordSchema

type RecordSchema struct {
	Type          Type                `json:"type"`
	Namespace     string              `json:"namespace,omitempty"`
	Name          string              `json:"name"`
	Aliases       []string            `json:"aliases,omitempty"`
	Documentation string              `json:"doc,omitempty"`
	Fields        []RecordFieldSchema `json:"fields"`
}

RecordSchema has fields

func (*RecordSchema) TypeName

func (t *RecordSchema) TypeName() Type

TypeName -

type Schema

type Schema interface {
	TypeName() Type
}

Schema -

Example
package main

import (
	"encoding/json"
	"fmt"

	"github.com/khezen/avro"
)

func main() {
	schemaBytes := []byte(
		`{
			"type": "record",
			"namespace": "test",
			"name": "LongList",
			"aliases": [
				"LinkedLongs"
			],
			"doc": "linked list of 64 bits integers",
			"fields": [
				{
					"name": "value",
					"type": "long"
				},
				{
					"name": "next",
					"type": [
						"null",
						"LongList"
					]
				}
			]
		}`,
	)
	// Unmarshal JSON  bytes to Schema interface
	var anySchema avro.AnySchema
	err := json.Unmarshal(schemaBytes, &anySchema)
	if err != nil {
		panic(err)
	}
	schema := anySchema.Schema()

	// Marshal Schema interface to JSON bytes
	schemaBytes, err = json.Marshal(schema)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(schemaBytes))
}
Output:

type Type

type Type string

Type - primitive or derived type name as defined below

const (

	// TypeNull -
	TypeNull Type = "null"
	// TypeBoolean -
	TypeBoolean Type = "boolean"
	// TypeInt32 -
	TypeInt32 Type = "int"
	// TypeInt64 -
	TypeInt64 Type = "long"
	// TypeFloat32 -
	TypeFloat32 Type = "float"
	// TypeFloat64 -
	TypeFloat64 Type = "double"
	// TypeBytes -
	TypeBytes Type = "bytes"
	// TypeString -
	TypeString Type = "string"

	// TypeUnion -
	TypeUnion Type = "union"
	// TypeRecord -
	TypeRecord Type = "record"
	// TypeArray -
	TypeArray Type = "array"
	// TypeMap -
	TypeMap Type = "map"
	// TypeEnum -
	TypeEnum Type = "enum"
	// TypeFixed -
	TypeFixed Type = "fixed"
)

func (Type) TypeName

func (t Type) TypeName() Type

TypeName -

type UnionSchema

type UnionSchema []Schema

UnionSchema - A JSON array, representing a union of embedded types.

func (UnionSchema) TypeName

func (t UnionSchema) TypeName() Type

TypeName -

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL