Documentation ¶
Index ¶
Constants ¶
const ( // ErrReadingSchemaWrapper wraps errors returned while reading the schema file ErrReadingSchemaWrapper = "error reading schema %s" // ErrCreatingCodecWrapper wraps errors returned while creating the go-avro codec ErrCreatingCodecWrapper = "error creating codec for schema %s" // ErrDecodingMessageWrapper wraps errors returned decoding the message ErrDecodingMessageWrapper = "error decoding message" )
Variables ¶
var ( // ErrInvalidSchema denotes that the schema file is not in the correct format ErrInvalidSchema = errors.New("invalid schema, schemas must be .avsc files") // ErrNoCodec denotes that Decode has been called, but no go-avro codec has been created for it ErrNoCodec = errors.New("could not find codec. Was ValidateSchemas called yet?") // ErrAssertingType denotes that one or more fields failed type assertion ErrAssertingType = errors.New("could not decode message, type assertion failed") )
Functions ¶
This section is empty.
Types ¶
type AvroDecoder ¶
type AvroDecoder struct { Converter Converter // contains filtered or unexported fields }
AvroDecoder implements the decoder interface and can should be able to decode messages for most schemas
func (*AvroDecoder) Decode ¶
func (a *AvroDecoder) Decode(msg []byte) (interface{}, error)
Decode takes in an Avro message's value and uses the codec created in ValidateSchemas to decode the message
func (*AvroDecoder) ValidateSchemas ¶
func (a *AvroDecoder) ValidateSchemas(schemas string) error
ValidateSchemas takes in a single Avro schema, validates the schema and initializes an go-avro codec to process messages
type Converter ¶
Converter is an interface type for converting individual fields inside of a decoded Avro message to more easily readable types. For example, if a field is []byte but is supposed to be displayed as JSON, it needs to be converted to json.RawMessage so it can be printed properly.
type JSONDecoder ¶
JSONDecoder implements the Decoder interface and is used to decode Kafka messages to JSON
func (*JSONDecoder) Decode ¶
func (j *JSONDecoder) Decode(msg []byte) (interface{}, error)
Decode takes a sarama consumermessage
func (*JSONDecoder) ValidateSchemas ¶
func (j *JSONDecoder) ValidateSchemas(schemas string) error
ValidateSchemas returns nil since JSON has no defined schema
type MsgPackDecoder ¶
type MsgPackDecoder struct{}
MsgPackDecoder decodes kafka messages written in MsgPack
func (*MsgPackDecoder) Decode ¶
func (m *MsgPackDecoder) Decode(msg []byte) (interface{}, error)
Decode returns a decoded MessagePack message
func (*MsgPackDecoder) ValidateSchemas ¶
func (m *MsgPackDecoder) ValidateSchemas(schemas string) error
ValidateSchemas returns nil since schemas are not required for MsgPack