goavro

package module
v2.12.1-schema-2022101... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

README

goavro

Goavro is a library that encodes and decodes Avro data.

Description

  • Encodes to and decodes from both binary and textual JSON Avro data.
  • Codec is stateless and is safe to use by multiple goroutines.

With the exception of features not yet supported, goavro attempts to be fully compliant with the most recent version of the Avro specification.

Dependency Notice

All usage of gopkg.in has been removed in favor of Go modules. Please update your import paths to github.com/linkedin/goavro/v2. v1 users can still use old versions of goavro by adding a constraint to your go.mod or Gopkg.toml file.

require (
    github.com/linkedin/goavro v1.0.5
)
[[constraint]]
name = "github.com/linkedin/goavro"
version = "=1.0.5"

Major Improvements in v2 over v1

Avro namespaces

The original version of this library was written prior to my really understanding how Avro namespaces ought to work. After using Avro for a long time now, and after a lot of research, I think I grok Avro namespaces properly, and the library now correctly handles every test case the Apache Avro distribution has for namespaces, including being able to refer to a previously defined data type later on in the same schema.

Getting Data into and out of Records

The original version of this library required creating goavro.Record instances, and use of getters and setters to access a record's fields. When schemas were complex, this required a lot of work to debug and get right. The original version also required users to break schemas in chunks, and have a different schema for each record type. This was cumbersome, annoying, and error prone.

The new version of this library eliminates the goavro.Record type, and accepts a native Go map for all records to be encoded. Keys are the field names, and values are the field values. Nothing could be more easy. Conversely, decoding Avro data yields a native Go map for the upstream client to pull data back out of.

Furthermore, there is never a reason to ever have to break your schema down into record schemas. Merely feed the entire schema into the NewCodec function once when you create the Codec, then use it. This library knows how to parse the data provided to it and ensure data values for records and their fields are properly encoded and decoded.

3x--4x Performance Improvement

The original version of this library was truly written with Go's idea of io.Reader and io.Writer composition in mind. Although composition is a powerful tool, the original library had to pull bytes off the io.Reader--often one byte at a time--check for read errors, decode the bytes, and repeat. This version, by using a native Go byte slice, both decoding and encoding complex Avro data here at LinkedIn is between three and four times faster than before.

Avro JSON Support

The original version of this library did not support JSON encoding or decoding, because it wasn't deemed useful for our internal use at the time. When writing the new version of the library I decided to tackle this issue once and for all, because so many engineers needed this functionality for their work.

Better Handling of Record Field Default Values

The original version of this library did not well handle default values for record fields. This version of the library uses a default value of a record field when encoding from native Go data to Avro data and the record field is not specified. Additionally, when decoding from Avro JSON data to native Go data, and a field is not specified, the default value will be used to populate the field.

Contrast With Code Generation Tools

If you have the ability to rebuild and redeploy your software whenever data schemas change, code generation tools might be the best solution for your application.

There are numerous excellent tools for generating source code to translate data between native and Avro binary or textual data. One such tool is linked below. If a particular application is designed to work with a rarely changing schema, programs that use code generated functions can potentially be more performant than a program that uses goavro to create a Codec dynamically at run time.

I recommend benchmarking the resultant programs using typical data using both the code generated functions and using goavro to see which performs better. Not all code generated functions will out perform goavro for all data corpuses.

If you don't have the ability to rebuild and redeploy software updates whenever a data schema change occurs, goavro could be a great fit for your needs. With goavro, your program can be given a new schema while running, compile it into a Codec on the fly, and immediately start encoding or decoding data using that Codec. Because Avro encoding specifies that encoded data always be accompanied by a schema this is not usually a problem. If the schema change is backwards compatible, and the portion of your program that handles the decoded data is still able to reference the decoded fields, there is nothing that needs to be done when the schema change is detected by your program when using goavro Codec instances to encode or decode data.

Resources

Usage

Documentation is available via GoDoc.

package main

import (
    "fmt"

    "github.com/linkedin/goavro/v2"
)

func main() {
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // NOTE: May omit fields when using default value
    textual := []byte(`{"next":{"LongList":{}}}`)

    // Convert textual Avro data (in Avro JSON format) to native Go form
    native, _, err := codec.NativeFromTextual(textual)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to binary Avro data
    binary, err := codec.BinaryFromNative(nil, native)
    if err != nil {
        fmt.Println(err)
    }

    // Convert binary Avro data back to native Go form
    native, _, err = codec.NativeFromBinary(binary)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to textual Avro data
    textual, err = codec.TextualFromNative(nil, native)
    if err != nil {
        fmt.Println(err)
    }

    // NOTE: Textual encoding will show all fields, even those with values that
    // match their default values
    fmt.Println(string(textual))
    // Output: {"next":{"LongList":{"next":null}}}
}

Also please see the example programs in the examples directory for reference.

OCF file reading and writing

This library supports reading and writing data in Object Container File (OCF) format

package main

import (
	"bytes"
	"fmt"
	"strings"

	"github.com/linkedin/goavro/v2"
)

func main() {
	avroSchema := `
	{
	  "type": "record",
	  "name": "test_schema",
	  "fields": [
		{
		  "name": "time",
		  "type": "long"
		},
		{
		  "name": "customer",
		  "type": "string"
		}
	  ]
	}`

	// Writing OCF data
	var ocfFileContents bytes.Buffer
	writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
		W:      &ocfFileContents,
		Schema: avroSchema,
	})
	if err != nil {
		fmt.Println(err)
	}
	err = writer.Append([]map[string]interface{}{
		{
			"time":     1617104831727,
			"customer": "customer1",
		},
		{
			"time":     1717104831727,
			"customer": "customer2",
		},
	})
	fmt.Println("ocfFileContents", ocfFileContents.String())

	// Reading OCF data
	ocfReader, err := goavro.NewOCFReader(strings.NewReader(ocfFileContents.String()))
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("Records in OCF File");
	for ocfReader.Scan() {
		record, err := ocfReader.Read()
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println("record", record)
	}
}

The above code in go playground

ab2t

The ab2t program is similar to the reference standard avrocat program and converts Avro OCF files to Avro JSON encoding.

arw

The Avro-ReWrite program, arw, can be used to rewrite an Avro OCF file while optionally changing the block counts, the compression algorithm. arw can also upgrade the schema provided the existing datum values can be encoded with the newly provided schema.

avroheader

The Avro Header program, avroheader, can be used to print various header information from an OCF file.

splice

The splice program can be used to splice together an OCF file from an Avro schema file and a raw Avro binary data file.

Translating Data

A Codec provides four methods for translating between a byte slice of either binary or textual Avro data and native Go data.

The following methods convert data between native Go data and byte slices of the binary Avro representation:

BinaryFromNative
NativeFromBinary

The following methods convert data between native Go data and byte slices of the textual Avro representation:

NativeFromTextual
TextualFromNative

Each Codec also exposes the Schema method to return a simplified version of the JSON schema string used to create the Codec.

Translating From Avro to Go Data

Goavro does not use Go's structure tags to translate data between native Go types and Avro encoded data.

When translating from either binary or textual Avro to native Go data, goavro returns primitive Go data values for corresponding Avro data values. The table below shows how goavro translates Avro types to Go types.

Avro Go    
null nil
boolean bool
bytes []byte
float float32
double float64
long int64
int int32  
string string
array []interface{}
enum string
fixed []byte      
map and record map[string]interface{}
union see below   

Because of encoding rules for Avro unions, when an union's value is null, a simple Go nil is returned. However when an union's value is non-nil, a Go map[string]interface{} with a single key is returned for the union. The map's single key is the Avro type name and its value is the datum's value.

Translating From Go to Avro Data

Goavro does not use Go's structure tags to translate data between native Go types and Avro encoded data.

When translating from native Go to either binary or textual Avro data, goavro generally requires the same native Go data types as the decoder would provide, with some exceptions for programmer convenience. Goavro will accept any numerical data type provided there is no precision lost when encoding the value. For instance, providing float64(3.0) to an encoder expecting an Avro int would succeed, while sending float64(3.5) to the same encoder would return an error.

When providing a slice of items for an encoder, the encoder will accept either []interface{}, or any slice of the required type. For instance, when the Avro schema specifies: {"type":"array","items":"string"}, the encoder will accept either []interface{}, or []string. If given []int, the encoder will return an error when it attempts to encode the first non-string array value using the string encoder.

When providing a value for an Avro union, the encoder will accept nil for a null value. If the value is non-nil, it must be a map[string]interface{} with a single key-value pair, where the key is the Avro type name and the value is the datum's value. As a convenience, the Union function wraps any datum value in a map as specified above.

func ExampleUnion() {
    codec, err := goavro.NewCodec(`["null","string","int"]`)
    if err != nil {
        fmt.Println(err)
    }
    buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println(string(buf))
    // Output: {"string":"some string"}
}

Limitations

Goavro is a fully featured encoder and decoder of binary and textual JSON Avro data. It fully supports recursive data structures, unions, and namespacing. It does have a few limitations that have yet to be implemented.

Aliases

The Avro specification allows an implementation to optionally map a writer's schema to a reader's schema using aliases. Although goavro can compile schemas with aliases, it does not yet implement this feature.

Kafka Streams

Kafka is the reason goavro was written. Similar to Avro Object Container Files being a layer of abstraction above Avro Data Serialization format, Kafka's use of Avro is a layer of abstraction that also sits above Avro Data Serialization format, but has its own schema. Like Avro Object Container Files, this has been implemented but removed until the API can be improved.

Default Maximum Block Counts, and Block Sizes

When decoding arrays, maps, and OCF files, the Avro specification states that the binary includes block counts and block sizes that specify how many items are in the next block, and how many bytes are in the next block. To prevent possible denial-of-service attacks on clients that use this library caused by attempting to decode maliciously crafted data, decoded block counts and sizes are compared against public library variables MaxBlockCount and MaxBlockSize. When the decoded values exceed these values, the decoder returns an error.

Because not every upstream client is the same, we've chosen some sane defaults for these values, but left them as mutable variables, so that clients are able to override if deemed necessary for their purposes. Their initial default values are (math.MaxInt32 or ~2.2GB).

Schema Evolution

Please see my reasons why schema evolution is broken for Avro 1.x.

License

Goavro license

Copyright 2017 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Google Snappy license

Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
  • Neither the name of Google Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Third Party Dependencies

Google Snappy

Goavro links with Google Snappy to provide Snappy compression and decompression support.

Documentation

Overview

Package goavro is a library that encodes and decodes Avro data.

Goavro provides methods to encode native Go data into both binary and textual JSON Avro data, and methods to decode both binary and textual JSON Avro data to native Go data.

Goavro also provides methods to read and write Object Container File (OCF) formatted files, and the library contains example programs to read and write OCF files.

Usage Example:

    package main

    import (
        "fmt"

        "github.com/linkedin/goavro"
    )

    func main() {
        codec, err := goavro.NewCodec(`
            {
              "type": "record",
              "name": "LongList",
              "fields" : [
	      {"name": "next", "type": ["null", "LongList", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}
              ]
            }`)
        if err != nil {
            fmt.Println(err)
        }

        // NOTE: May omit fields when using default value
        textual := []byte(`{"next":{"LongList":{}}}`)

        // Convert textual Avro data (in Avro JSON format) to native Go form
        native, _, err := codec.NativeFromTextual(textual)
        if err != nil {
            fmt.Println(err)
        }

        // Convert native Go form to binary Avro data
        binary, err := codec.BinaryFromNative(nil, native)
        if err != nil {
            fmt.Println(err)
        }

        // Convert binary Avro data back to native Go form
        native, _, err = codec.NativeFromBinary(binary)
        if err != nil {
            fmt.Println(err)
        }

        // Convert native Go form to textual Avro data
        textual, err = codec.TextualFromNative(nil, native)
        if err != nil {
            fmt.Println(err)
        }

        // NOTE: Textual encoding will show all fields, even those with values that
        // match their default values
        fmt.Println(string(textual))
        // Output: {"next":{"LongList":{"next":null}}}
    }

Index

Examples

Constants

View Source
const (
	// CompressionNullLabel is used when OCF blocks are not compressed.
	CompressionNullLabel = "null"

	// CompressionDeflateLabel is used when OCF blocks are compressed using the
	// deflate algorithm.
	CompressionDeflateLabel = "deflate"

	// CompressionSnappyLabel is used when OCF blocks are compressed using the
	// snappy algorithm.
	CompressionSnappyLabel = "snappy"
)

Variables

View Source
var (
	// MaxBlockCount is the maximum number of data items allowed in a single
	// block that will be decoded from a binary stream, whether when reading
	// blocks to decode an array or a map, or when reading blocks from an OCF
	// stream. This check is to ensure decoding binary data will not cause the
	// library to over allocate RAM, potentially creating a denial of service on
	// the system.
	//
	// If a particular application needs to decode binary Avro data that
	// potentially has more data items in a single block, then this variable may
	// be modified at your discretion.
	MaxBlockCount = int64(math.MaxInt32)

	// MaxBlockSize is the maximum number of bytes that will be allocated for a
	// single block of data items when decoding from a binary stream. This check
	// is to ensure decoding binary data will not cause the library to over
	// allocate RAM, potentially creating a denial of service on the system.
	//
	// If a particular application needs to decode binary Avro data that
	// potentially has more bytes in a single block, then this variable may be
	// modified at your discretion.
	MaxBlockSize = int64(math.MaxInt32)
)
View Source
var (
	// RelaxedNameValidation causes name validation to allow the first component
	// of an Avro namespace to be the empty string.
	RelaxedNameValidation bool
)

Functions

func FingerprintFromSOE

func FingerprintFromSOE(buf []byte) (uint64, []byte, error)

FingerprintFromSOE returns the unsigned 64-bit Rabin fingerprint from the header of a buffer that encodes a Single-Object Encoded datum. This function is designed to be used to lookup a Codec that can decode the contents of the buffer. Once a Codec is found that has the matching Rabin fingerprint, its NativeFromBinary method may be used to decode the remaining bytes returned as the second return value. On failure this function returns an ErrNotSingleObjectEncoded error.

func decode(codex map[uint64]*goavro.Codec, buf []byte) error {
    // Perform a sanity check on the buffer, then return the Rabin fingerprint
    // of the schema used to encode the data.
    fingerprint, newBuf, err := goavro.FingerprintFromSOE(buf)
    if err != nil {
        return err
    }

    // Get a previously stored Codec from the codex map.
    codec, ok := codex[fingerprint]
    if !ok {
        return fmt.Errorf("unknown codec: %#x", fingerprint)
    }

    // Use the fetched Codec to decode the buffer as a SOE.
    //
    // Faster because SOE magic prefix and schema fingerprint already
    // checked and used to fetch the Codec.  Just need to decode the binary
    // bytes remaining after the prefix were removed.
    datum, _, err := codec.NativeFromBinary(newBuf)
    if err != nil {
        return err
    }

    _, err = fmt.Println(datum)
    return err
}

func Union

func Union(name string, datum interface{}) interface{}

Union wraps a datum value in a map for encoding as a Union, as required by Union encoder.

When providing a value for an Avro union, the encoder will accept `nil` for a `null` value. If the value is non-`nil`, it must be a `map[string]interface{}` with a single key-value pair, where the key is the Avro type name and the value is the datum's value. As a convenience, the `Union` function wraps any datum value in a map as specified above.

func ExampleUnion() {
   codec, err := goavro.NewCodec(`["null","string","int"]`)
   if err != nil {
       fmt.Println(err)
   }
   buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
   if err != nil {
       fmt.Println(err)
   }
   fmt.Println(string(buf))
   // Output: {"string":"some string"}
}
Example (LogicalType)
// Supported logical types and their native go types:
// * timestamp-millis - time.Time
// * timestamp-micros - time.Time
// * time-millis      - time.Duration
// * time-micros      - time.Duration
// * date             - int
// * decimal          - big.Rat
codec, err := NewCodec(`["null", {"type": "long", "logicalType": "timestamp-millis"}]`)
if err != nil {
	fmt.Println(err)
}

// Note the usage of type.logicalType i.e. `long.timestamp-millis` to denote the type in a union. This is due to the single string naming format
// used by goavro. Decimal can be both bytes.decimal or fixed.decimal
bytes, err := codec.BinaryFromNative(nil, map[string]interface{}{"long.timestamp-millis": time.Date(2006, 1, 2, 15, 4, 5, 0, time.UTC)})
if err != nil {
	fmt.Println(err)
}

decoded, _, err := codec.NativeFromBinary(bytes)
if err != nil {
	fmt.Println(err)
}
out := decoded.(map[string]interface{})
fmt.Printf("%#v\n", out["long.timestamp-millis"].(time.Time).String())
Output:

"2006-01-02 15:04:05 +0000 UTC"

Types

type Codec

type Codec struct {
	Rabin uint64
	// contains filtered or unexported fields
}

Codec supports decoding binary and text Avro data to Go native data types, and conversely encoding Go native data types to binary or text Avro data. A Codec is created as a stateless structure that can be safely used in multiple go routines simultaneously.

func NewCodec

func NewCodec(schemaSpecification string) (*Codec, error)

NewCodec returns a Codec used to translate between a byte slice of either binary or textual Avro data and native Go data.

Creating a `Codec` is fast, but ought to be performed exactly once per Avro schema to process. Once a `Codec` is created, it may be used multiple times to convert data between native form and binary Avro representation, or between native form and textual Avro representation.

A particular `Codec` can work with only one Avro schema. However, there is no practical limit to how many `Codec`s may be created and used in a program. Internally a `Codec` is merely a named tuple of four function pointers, and maintains no runtime state that is mutated after instantiation. In other words, `Codec`s may be safely used by many go routines simultaneously, as your program requires.

codec, err := goavro.NewCodec(`
    {
      "type": "record",
      "name": "LongList",
      "fields" : [
        {"name": "next", "type": ["null", "LongList"], "default": null}
      ]
    }`)
if err != nil {
        fmt.Println(err)
}

func NewCodecForStandardJSON

func NewCodecForStandardJSON(schemaSpecification string) (*Codec, error)

NewCodecForStandardJSON returns a codec that uses a special union processing code that allows normal json to be ingested via an avro schema, by inferring the "type" intended for union types.

This is the one-way code to get such json into the avro system and the deserialization is not supported in this codec - its json into avro-json one-way and one-way only for this codec.

The "type" inference is done by using the types specified as potentially acceptable types for the union, and trying to unpack the incomin json into each of the specified types for the union type. See union.go +/Standard JSON/ for a general description of the problem and details of the solution are in union.go +/nativeAvroFromTextualJson/

For a general description of a codex seen the comment for NewCodec above.

The following is the exact same schema used in the above code for NewCodec:

codec, err := goavro.NewCodecForStandardJSON(`
    {
      "type": "record",
      "name": "LongList",
      "fields" : [
        {"name": "next", "type": ["null", "LongList"], "default": null}
      ]
    }`)
if err != nil {
        fmt.Println(err)
}

The above will take json of this sort:

{"next": null}

{"next":{"next":null}}

{"next":{"next":{"next":null}}}

For more examples see the test cases in union_test.go

func NewCodecForStandardJSONFull

func NewCodecForStandardJSONFull(schemaSpecification string) (*Codec, error)

NewCodecForStandardJSONFull provides full serialization/deserialization for json that meets the expectations of regular internet json, viewed as something distinct from avro-json which has special handling for union types. For details see the above comments.

With this `codec` you can expect to see a json string like this:

"Follow your bliss."

to deserialize into the same json structure

"Follow your bliss."

func NewCodecForStandardJSONOneWay

func NewCodecForStandardJSONOneWay(schemaSpecification string) (*Codec, error)

NewCodecForStandardJSONOneWay is an alias for NewCodecForStandardJSON added to make the transition to two-way json handling more smooth

This will unambiguously provide OneWay avro encoding for standard internet json. This takes in internet json, and brings it into the avro world, but the deserialization retains the unique form of normal avro-friendly json where unions have their types types specified in stream like this example from the official docs // https://avro.apache.org/docs/1.11.1/api/c/

`{"string": "Follow your bliss."}`

To be clear this means the incoming json string:

"Follow your bliss."

would deserialize according to the avro-json expectations to:

`{"string": "Follow your bliss."}`

To get full two-way support see the below NewCodecForStandardJSONFull

func NewCodecFrom

func NewCodecFrom(schemaSpecification string, cb *codecBuilder) (*Codec, error)

func (*Codec) BinaryFromNative

func (c *Codec) BinaryFromNative(buf []byte, datum interface{}) ([]byte, error)

BinaryFromNative appends the binary encoded byte slice representation of the provided native datum value to the provided byte slice in accordance with the Avro schema supplied when creating the Codec. It is supplied a byte slice to which to append the binary encoded data along with the actual data to encode. On success, it returns a new byte slice with the encoded bytes appended, and a nil error value. On error, it returns the original byte slice, and the error message.

func ExampleBinaryFromNative() {
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to binary Avro data
    binary, err := codec.BinaryFromNative(nil, map[string]interface{}{
        "next": map[string]interface{}{
            "LongList": map[string]interface{}{
                "next": map[string]interface{}{
                    "LongList": map[string]interface{}{
                    // NOTE: May omit fields when using default value
                    },
                },
            },
        },
    })
    if err != nil {
        fmt.Println(err)
    }

    fmt.Printf("%#v", binary)
    // Output: []byte{0x2, 0x2, 0x0}
}
Example (Avro)
codec, err := NewCodec(`
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "next", "type": ["null", "LongList"], "default": null}
  ]
}
`)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to binary Avro data
binary, err := codec.BinaryFromNative(nil, map[string]interface{}{
	"next": map[string]interface{}{
		"LongList": map[string]interface{}{
			"next": map[string]interface{}{
				"LongList": map[string]interface{}{
					// NOTE: May omit fields when using default value
				},
			},
		},
	},
})
if err != nil {
	fmt.Println(err)
}

fmt.Printf("%#v", binary)
Output:

[]byte{0x2, 0x2, 0x0}

func (*Codec) CanonicalSchema

func (c *Codec) CanonicalSchema() string

CanonicalSchema returns the Parsing Canonical Form of the schema according to the Avro specification.

Example
schema := `{"type":"map","values":{"type":"enum","name":"foo","symbols":["alpha","bravo"]}}`
codec, err := NewCodec(schema)
if err != nil {
	fmt.Println(err)
} else {
	fmt.Println(codec.CanonicalSchema())
}
Output:

{"type":"map","values":{"name":"foo","type":"enum","symbols":["alpha","bravo"]}}

func (*Codec) NativeFromBinary

func (c *Codec) NativeFromBinary(buf []byte) (interface{}, []byte, error)

NativeFromBinary returns a native datum value from the binary encoded byte slice in accordance with the Avro schema supplied when creating the Codec. On success, it returns the decoded datum, a byte slice containing the remaining undecoded bytes, and a nil error value. On error, it returns nil for the datum value, the original byte slice, and the error message.

func ExampleNativeFromBinary() {
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to binary Avro data
    binary := []byte{0x2, 0x2, 0x0}

    native, _, err := codec.NativeFromBinary(binary)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Printf("%v", native)
    // Output: map[next:map[LongList:map[next:map[LongList:map[next:<nil>]]]]]
}
Example (Avro)
codec, err := NewCodec(`
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "next", "type": ["null", "LongList"], "default": null}
  ]
}
`)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to binary Avro data
binary := []byte{0x2, 0x2, 0x0}

native, _, err := codec.NativeFromBinary(binary)
if err != nil {
	fmt.Println(err)
}

fmt.Printf("%v", native)
Output:

map[next:map[LongList:map[next:map[LongList:map[next:<nil>]]]]]
Example (SingleItemDecoding)
codec1, err := NewCodec(`"int"`)
if err != nil {
	fmt.Fprintf(os.Stderr, "%s\n", err)
	return
}

// Create a map of fingerprint values to corresponding Codec instances.
codex := make(map[uint64]*Codec)
codex[codec1.Rabin] = codec1

// Later on when you want to decode such a slice of bytes as a Single-Object
// Encoding, obtain the Rabin fingerprint of the schema used to encode the
// data.
buf := []byte{195, 1, 143, 92, 57, 63, 26, 213, 117, 114, 6}

fingerprint, newBuf, err := FingerprintFromSOE(buf)
if err != nil {
	fmt.Fprintf(os.Stderr, "%s\n", err)
	return
}

// Get a previously stored Codec from the codex map.
codec2, ok := codex[fingerprint]
if !ok {
	fmt.Fprintf(os.Stderr, "unknown codec: %d\n", fingerprint)
	return
}

// Use the fetched Codec to decode the buffer as a SOE.
datum, _, err := codec2.NativeFromBinary(newBuf)
if err != nil {
	fmt.Fprintf(os.Stderr, "%s\n", err)
	return
}
fmt.Println(datum)
Output:

3

func (*Codec) NativeFromSingle

func (c *Codec) NativeFromSingle(buf []byte) (interface{}, []byte, error)

NativeFromSingle converts Avro data from Single-Object-Encoded format from the provided byte slice to Go native data types in accordance with the Avro schema supplied when creating the Codec. On success, it returns the decoded datum, along with a new byte slice with the decoded bytes consumed, and a nil error value. On error, it returns nil for the datum value, the original byte slice, and the error message.

func decode(codec *goavro.Codec, buf []byte) error {
    datum, _, err := codec.NativeFromSingle(buf)
    if err != nil {
        return err
    }
    _, err = fmt.Println(datum)
    return err
}

func (*Codec) NativeFromTextual

func (c *Codec) NativeFromTextual(buf []byte) (interface{}, []byte, error)

NativeFromTextual converts Avro data in JSON text format from the provided byte slice to Go native data types in accordance with the Avro schema supplied when creating the Codec. On success, it returns the decoded datum, along with a new byte slice with the decoded bytes consumed, and a nil error value. On error, it returns nil for the datum value, the original byte slice, and the error message.

func ExampleNativeFromTextual() {
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to text Avro data
    text := []byte(`{"next":{"LongList":{"next":{"LongList":{"next":null}}}}}`)

    native, _, err := codec.NativeFromTextual(text)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Printf("%v", native)
    // Output: map[next:map[LongList:map[next:map[LongList:map[next:<nil>]]]]]
}
Example (Avro)
codec, err := NewCodec(`
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "next", "type": ["null", "LongList"], "default": null}
  ]
}
`)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to text Avro data
text := []byte(`{"next":{"LongList":{"next":{"LongList":{"next":null}}}}}`)

native, _, err := codec.NativeFromTextual(text)
if err != nil {
	fmt.Println(err)
}

fmt.Printf("%v", native)
Output:

map[next:map[LongList:map[next:map[LongList:map[next:<nil>]]]]]
Example (CheckSolutionGH233)
const avroSchema = `
	{
		"type": "record",
		"name": "FooBar",
		"namespace": "com.foo.bar",
		"fields": [
		  {
				"name": "event",
				"type": [
					"null",
					{
						"type": "enum",
						"name": "FooBarEvent",
						"symbols": ["CREATED", "UPDATED"]
					}
				]
			}
		]
	}
	`
codec, _ := NewCodec(avroSchema)

const avroJSON = `{"event":{"com.foo.bar.FooBarEvent":"CREATED"}}`

native, _, err := codec.NativeFromTextual([]byte(avroJSON))
if err != nil {
	panic(err)
}

blob, err := json.Marshal(native)
if err != nil {
	panic(err)
}
fmt.Println(string(blob))
Output:

{"event":{"com.foo.bar.FooBarEvent":"CREATED"}}
Example (Json)
codec, err := NewCodecFrom(`["null","string","int"]`, &codecBuilder{
	buildCodecForTypeDescribedByMap,
	buildCodecForTypeDescribedByString,
	buildCodecForTypeDescribedBySliceOneWayJSON,
})
if err != nil {
	fmt.Println(err)
}
// send in a legit json string
t, _, err := codec.NativeFromTextual([]byte("\"some string one\""))
if err != nil {
	fmt.Println(err)
}
// see it parse into a map like the avro encoder does
o, ok := t.(map[string]interface{})
if !ok {
	fmt.Printf("its a %T not a map[string]interface{}", t)
}

// pull out the string to show its all good
_v := o["string"]
v := _v.(string)
fmt.Println(v)
Output:

some string one
Example (RoundTrip)
codec, err := NewCodec(`
{
  "type": "record",
  "name": "LongList",
  "fields" : [
	{"name": "next", "type": ["null", "LongList"], "default": null}
  ]
}
`)
if err != nil {
	fmt.Println(err)
}

// NOTE: May omit fields when using default value
textual := []byte(`{"next":{"LongList":{"next":{"LongList":{}}}}}`)

// Convert textual Avro data (in Avro JSON format) to native Go form
native, _, err := codec.NativeFromTextual(textual)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to binary Avro data
binary, err := codec.BinaryFromNative(nil, native)
if err != nil {
	fmt.Println(err)
}

// Convert binary Avro data back to native Go form
native, _, err = codec.NativeFromBinary(binary)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to textual Avro data
textual, err = codec.TextualFromNative(nil, native)
if err != nil {
	fmt.Println(err)
}

// NOTE: Textual encoding will show all fields, even those with values that
// match their default values
fmt.Println(string(textual))
Output:

{"next":{"LongList":{"next":{"LongList":{"next":null}}}}}

func (*Codec) Schema

func (c *Codec) Schema() string

Schema returns the original schema used to create the Codec.

func (*Codec) SchemaCRC64Avro deprecated

func (c *Codec) SchemaCRC64Avro() int64

SchemaCRC64Avro returns a signed 64-bit integer Rabin fingerprint for the canonical schema. This method returns the signed 64-bit cast of the unsigned 64-bit schema Rabin fingerprint.

Deprecated: This method has been replaced by the Rabin structure Codec field and is provided for backward compatibility only.

func (*Codec) SingleFromNative

func (c *Codec) SingleFromNative(buf []byte, datum interface{}) ([]byte, error)

SingleFromNative appends the single-object-encoding byte slice representation of the provided native datum value to the provided byte slice in accordance with the Avro schema supplied when creating the Codec. It is supplied a byte slice to which to append the header and binary encoded data, along with the actual data to encode. On success, it returns a new byte slice with the encoded bytes appended, and a nil error value. On error, it returns the original byte slice, and the error message.

func ExampleSingleItemEncoding() {
    codec, err := goavro.NewCodec(`"int"`)
    if err != nil {
        fmt.Fprintf(os.Stderr, "%s\n", err)
        return
    }

    buf, err := codec.SingleFromNative(nil, 3)
    if err != nil {
        fmt.Fprintf(os.Stderr, "%s\n", err)
        return
    }

    fmt.Println(buf)
    // Output: [195 1 143 92 57 63 26 213 117 114 6]
}
Example
codec, err := NewCodec(`"int"`)
if err != nil {
	fmt.Fprintf(os.Stderr, "%s\n", err)
	return
}

buf, err := codec.SingleFromNative(nil, 3)
if err != nil {
	fmt.Fprintf(os.Stderr, "%s\n", err)
	return
}

fmt.Println(buf)
Output:

[195 1 143 92 57 63 26 213 117 114 6]

func (*Codec) TextualFromNative

func (c *Codec) TextualFromNative(buf []byte, datum interface{}) ([]byte, error)

TextualFromNative converts Go native data types to Avro data in JSON text format in accordance with the Avro schema supplied when creating the Codec. It is supplied a byte slice to which to append the encoded data and the actual data to encode. On success, it returns a new byte slice with the encoded bytes appended, and a nil error value. On error, it returns the original byte slice, and the error message.

func ExampleTextualFromNative() {
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // Convert native Go form to text Avro data
    text, err := codec.TextualFromNative(nil, map[string]interface{}{
        "next": map[string]interface{}{
            "LongList": map[string]interface{}{
                "next": map[string]interface{}{
                    "LongList": map[string]interface{}{
                    // NOTE: May omit fields when using default value
                    },
                },
            },
        },
    })
    if err != nil {
        fmt.Println(err)
    }

    fmt.Printf("%s", text)
    // Output: {"next":{"LongList":{"next":{"LongList":{"next":null}}}}}
}
Example

show how to use the default codec via the NewCodecFrom mechanism

codec, err := NewCodecFrom(`"string"`, &codecBuilder{
	buildCodecForTypeDescribedByMap,
	buildCodecForTypeDescribedByString,
	buildCodecForTypeDescribedBySlice,
})
if err != nil {
	fmt.Println(err)
}
buf, err := codec.TextualFromNative(nil, "some string 22")
if err != nil {
	fmt.Println(err)
}
fmt.Println(string(buf))
Output:

"some string 22"
Example (Avro)
codec, err := NewCodec(`
{
  "type": "record",
  "name": "LongList",
  "fields" : [
    {"name": "next", "type": ["null", "LongList"], "default": null}
  ]
}
`)
if err != nil {
	fmt.Println(err)
}

// Convert native Go form to text Avro data
text, err := codec.TextualFromNative(nil, map[string]interface{}{
	"next": map[string]interface{}{
		"LongList": map[string]interface{}{
			"next": map[string]interface{}{
				"LongList": map[string]interface{}{
					// NOTE: May omit fields when using default value
				},
			},
		},
	},
})
if err != nil {
	fmt.Println(err)
}

fmt.Printf("%s", text)
Output:

{"next":{"LongList":{"next":{"LongList":{"next":null}}}}}
Example (Json)

Use the standard JSON codec instead

codec, err := NewCodecFrom(`["null","string","int"]`, &codecBuilder{
	buildCodecForTypeDescribedByMap,
	buildCodecForTypeDescribedByString,
	buildCodecForTypeDescribedBySliceOneWayJSON,
})
if err != nil {
	fmt.Println(err)
}
buf, err := codec.TextualFromNative(nil, Union("string", "some string"))
if err != nil {
	fmt.Println(err)
}
fmt.Println(string(buf))
Output:

{"string":"some string"}
Example (Union)
codec, err := NewCodec(`["null","string","int"]`)
if err != nil {
	fmt.Println(err)
}
buf, err := codec.TextualFromNative(nil, Union("string", "some string"))
if err != nil {
	fmt.Println(err)
}
fmt.Println(string(buf))
Output:

{"string":"some string"}
Example (Union_json)
// Imagine a record field with the following union type. I have seen this
// sort of type in many schemas. I have been told the reasoning behind it is
// when the writer desires to encode data to JSON that cannot be written as
// a JSON number, then to encode it as a string and allow the reader to
// parse the string accordingly.
codec, err := NewCodec(`["null","double","string"]`)
if err != nil {
	fmt.Println(err)
}

native, _, err := codec.NativeFromTextual([]byte(`{"string":"NaN"}`))
if err != nil {
	fmt.Println(err)
}

value := math.NaN()
if native == nil {
	fmt.Print("decoded null: ")
} else {
	for k, v := range native.(map[string]interface{}) {
		switch k {
		case "double":
			fmt.Print("decoded double: ")
			value = v.(float64)
		case "string":
			fmt.Print("decoded string: ")
			s := v.(string)
			switch s {
			case "NaN":
				value = math.NaN()
			case "+Infinity":
				value = math.Inf(1)
			case "-Infinity":
				value = math.Inf(-1)
			default:
				var err error
				value, err = strconv.ParseFloat(s, 64)
				if err != nil {
					fmt.Println(err)
				}
			}
		}
	}
}
fmt.Println(value)
Output:

decoded string: NaN

type ErrInvalidName

type ErrInvalidName struct {
	Message string
}

ErrInvalidName is the error returned when one or more parts of an Avro name is invalid.

func (ErrInvalidName) Error

func (e ErrInvalidName) Error() string

type ErrNotSingleObjectEncoded

type ErrNotSingleObjectEncoded string

ErrNotSingleObjectEncoded is returned when an attempt is made to decode a single-object encoded value from a buffer that does not have the correct magic prefix.

func (ErrNotSingleObjectEncoded) Error

type ErrWrongCodec

type ErrWrongCodec uint64

ErrWrongCodec is returned when an attempt is made to decode a single-object encoded value using the wrong codec.

func (ErrWrongCodec) Error

func (e ErrWrongCodec) Error() string

type OCFConfig

type OCFConfig struct {
	// W specifies the `io.Writer` to which to send the encoded data,
	// (required). If W is `*os.File`, then creating an OCF for writing will
	// attempt to read any existing OCF header and use the schema and
	// compression codec specified by the existing header, then advance the file
	// position to the tail end of the file for appending.
	W io.Writer

	// Codec specifies the Codec to use for the new OCFWriter, (optional). If
	// the W parameter above is an `*os.File` which contains a Codec, the Codec
	// in the existing file will be used instead. Otherwise if this Codec
	// parameter is specified, it will be used. If neither the W parameter above
	// is an `*os.File` with an existing Codec, nor this Codec parameter is
	// specified, the OCFWriter will create a new Codec from the schema string
	// specified by the Schema parameter below.
	Codec *Codec

	// Schema specifies the Avro schema for the data to be encoded, (optional).
	// If neither the W parameter above is an `*os.File` with an existing Codec,
	// nor the Codec parameter above is specified, the OCFWriter will create a
	// new Codec from the schema string specified by this Schema parameter.
	Schema string

	// CompressionName specifies the compression codec used, (optional). If
	// omitted, defaults to "null" codec. When appending to an existing OCF,
	// this field is ignored.
	CompressionName string

	// MetaData specifies application specific meta data to be added to
	// the OCF file.  When appending to an existing OCF, this field
	// is ignored.
	MetaData map[string][]byte
}

OCFConfig is used to specify creation parameters for OCFWriter.

type OCFReader

type OCFReader struct {
	// contains filtered or unexported fields
}

OCFReader structure is used to read Object Container Files (OCF).

func NewOCFReader

func NewOCFReader(ior io.Reader) (*OCFReader, error)

NewOCFReader initializes and returns a new structure used to read an Avro Object Container File (OCF).

func example(ior io.Reader) error {
    // NOTE: Wrap provided io.Reader in a buffered reader, which improves the
    // performance of streaming file data.
    br := bufio.NewReader(ior)
    ocfr, err := goavro.NewOCFReader(br)
    if err != nil {
        return err
    }
    for ocfr.Scan() {
        datum, err := ocfr.Read()
        if err != nil {
            return err
        }
        fmt.Println(datum)
    }
    return ocfr.Err()
}

func (*OCFReader) Codec

func (ocfr *OCFReader) Codec() *Codec

Codec returns the codec found within the OCF file.

func (*OCFReader) CompressionName

func (ocfr *OCFReader) CompressionName() string

CompressionName returns the name of the compression algorithm found within the OCF file.

func (*OCFReader) Err

func (ocfr *OCFReader) Err() error

Err returns the last error encountered while reading the OCF file. See `NewOCFReader` documentation for an example.

func (*OCFReader) MetaData

func (ocfr *OCFReader) MetaData() map[string][]byte

MetaData returns the file metadata map found within the OCF file

func (*OCFReader) Read

func (ocfr *OCFReader) Read() (interface{}, error)

Read consumes one datum value from the Avro OCF stream and returns it. Read is designed to be called only once after each invocation of the Scan method. See `NewOCFReader` documentation for an example.

func (*OCFReader) RemainingBlockItems

func (ocfr *OCFReader) RemainingBlockItems() int64

RemainingBlockItems returns the number of items remaining in the block being processed.

func (*OCFReader) Scan

func (ocfr *OCFReader) Scan() bool

Scan returns true when there is at least one more data item to be read from the Avro OCF. Scan ought to be called prior to calling the Read method each time the Read method is invoked. See `NewOCFReader` documentation for an example.

func (*OCFReader) SkipThisBlockAndReset

func (ocfr *OCFReader) SkipThisBlockAndReset()

SkipThisBlockAndReset can be called after an error occurs while reading or decoding datum values from an OCF stream. OCF specifies each OCF stream contain one or more blocks of data. Each block consists of a block count, the number of bytes for the block, followed be the possibly compressed block. Inside each decompressed block is all of the binary encoded datum values concatenated together. In other words, OCF framing is at a block level rather than a datum level. If there is an error while reading or decoding a datum, the reader is not able to skip to the next datum value, because OCF does not have any markers for where each datum ends and the next one begins. Therefore, the reader is only able to skip this datum value and all subsequent datum values in the current block, move to the next block and start decoding datum values there.

type OCFWriter

type OCFWriter struct {
	// contains filtered or unexported fields
}

OCFWriter is used to create a new or append to an existing Avro Object Container File (OCF).

func NewOCFWriter

func NewOCFWriter(config OCFConfig) (*OCFWriter, error)

NewOCFWriter returns a new OCFWriter instance that may be used for appending binary Avro data, either by appending to an existing OCF file or creating a new OCF file.

func (*OCFWriter) Append

func (ocfw *OCFWriter) Append(data interface{}) error

Append appends one or more data items to an OCF file in a block. If there are more data items in the slice than MaxBlockCount allows, the data slice will be chunked into multiple blocks, each not having more than MaxBlockCount items.

func (*OCFWriter) Codec

func (ocfw *OCFWriter) Codec() *Codec

Codec returns the codec used by OCFWriter. This function provided because upstream may be appending to existing OCF which uses a different schema than requested during instantiation.

func (*OCFWriter) CompressionName

func (ocfw *OCFWriter) CompressionName() string

CompressionName returns the name of the compression algorithm used by OCFWriter. This function provided because upstream may be appending to existing OCF which uses a different compression algorithm than requested during instantiation. the OCF file.

Directories

Path Synopsis
examples
165
arw
soe

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL