Documentation

Overview

    Package coder contains coder representation and utilities. Coders describe how to serialize and deserialize pipeline data and may be provided by users.

    Index

    Constants

    This section is empty.

    Variables

    View Source
    var ErrVarIntTooLong = errors.New("varint too long")

      ErrVarIntTooLong indicates a data corruption issue that needs special handling by callers of decode. TODO(herohde): have callers perform this special handling.

      Functions

      func DecodeBool

      func DecodeBool(r io.Reader) (bool, error)

        DecodeBool decodes a boolean according to the beam protocol.

        func DecodeByte

        func DecodeByte(r io.Reader) (byte, error)

          DecodeByte decodes a single byte.

          func DecodeBytes

          func DecodeBytes(r io.Reader) ([]byte, error)

            DecodeBytes decodes a length prefixed []byte according to the beam protocol.

            func DecodeDouble

            func DecodeDouble(r io.Reader) (float64, error)

              DecodeDouble decodes a float64 in big endian format.

              func DecodeEventTime

              func DecodeEventTime(r io.Reader) (typex.EventTime, error)

                DecodeEventTime decodes an EventTime.

                func DecodeInt32

                func DecodeInt32(r io.Reader) (int32, error)

                  DecodeInt32 decodes an int32 in big endian format.

                  func DecodeStringUTF8

                  func DecodeStringUTF8(r io.Reader) (string, error)

                    DecodeStringUTF8 decodes a length prefixed UTF8 string.

                    func DecodeUint32

                    func DecodeUint32(r io.Reader) (uint32, error)

                      DecodeUint32 decodes an uint32 in big endian format.

                      func DecodeUint64

                      func DecodeUint64(r io.Reader) (uint64, error)

                        DecodeUint64 decodes an uint64 in big endian format.

                        func DecodeVarInt

                        func DecodeVarInt(r io.Reader) (int64, error)

                          DecodeVarInt decodes an int64.

                          func DecodeVarUint64

                          func DecodeVarUint64(r io.Reader) (uint64, error)

                            DecodeVarUint64 decodes an uint64.

                            func EncodeBool

                            func EncodeBool(v bool, w io.Writer) error

                              EncodeBool encodes a boolean according to the beam protocol.

                              func EncodeByte

                              func EncodeByte(v byte, w io.Writer) error

                                EncodeByte encodes a single byte.

                                func EncodeBytes

                                func EncodeBytes(v []byte, w io.Writer) error

                                  EncodeBytes encodes a []byte with a length prefix per the beam protocol.

                                  func EncodeDouble

                                  func EncodeDouble(value float64, w io.Writer) error

                                    EncodeDouble encodes a float64 in big endian format.

                                    func EncodeEventTime

                                    func EncodeEventTime(et typex.EventTime, w io.Writer) error

                                      EncodeEventTime encodes an EventTime as an uint64. The encoding is millis-since-epoch, but shifted so that the byte representation of negative values are lexicographically ordered before the byte representation of positive values.

                                      func EncodeInt32

                                      func EncodeInt32(value int32, w io.Writer) error

                                        EncodeInt32 encodes an int32 in big endian format.

                                        func EncodeStringUTF8

                                        func EncodeStringUTF8(s string, w io.Writer) error

                                          EncodeStringUTF8 encodes a UTF8 string with a length prefix.

                                          func EncodeUint32

                                          func EncodeUint32(value uint32, w io.Writer) error

                                            EncodeUint32 encodes an uint32 in big endian format.

                                            func EncodeUint64

                                            func EncodeUint64(value uint64, w io.Writer) error

                                              EncodeUint64 encodes an uint64 in big endian format.

                                              func EncodeVarInt

                                              func EncodeVarInt(value int64, w io.Writer) error

                                                EncodeVarInt encodes an int64.

                                                func EncodeVarUint64

                                                func EncodeVarUint64(value uint64, w io.Writer) error

                                                  EncodeVarUint64 encodes an uint64.

                                                  func IsCoGBK

                                                  func IsCoGBK(c *Coder) bool

                                                    IsCoGBK returns true iff the coder is for a CoGBK type.

                                                    func IsKV

                                                    func IsKV(c *Coder) bool

                                                      IsKV returns true iff the coder is for key-value pairs.

                                                      func IsW

                                                      func IsW(c *Coder) bool

                                                        IsW returns true iff the coder is for a WindowedValue.

                                                        func RegisterCoder

                                                        func RegisterCoder(t reflect.Type, enc, dec interface{})

                                                          RegisterCoder registers a user defined coder for a given type, and will be used if there is no beam coder for that type. Must be called prior to beam.Init(), preferably in an init() function.

                                                          Coders are encoder and decoder pairs, and operate around []bytes.

                                                          The coder used for a given type follows this ordering:

                                                          1. Coders for Known Beam types.
                                                          2. Coders registered for specific types
                                                          3. Coders registered for interfaces types
                                                          4. Default coder (JSON)
                                                          

                                                          Types of kind Interface, are handled specially by the registry, so they may be iterated over to check if element types implement them.

                                                          Repeated registrations of the same type overrides prior ones.

                                                          func RowDecoderForStruct

                                                          func RowDecoderForStruct(rt reflect.Type) (func(io.Reader) (interface{}, error), error)

                                                            RowDecoderForStruct returns a decoding function that decodes the beam row encoding into the given type.

                                                            Returns an error if the given type is invalid or not decodable from a beam schema row.

                                                            func RowEncoderForStruct

                                                            func RowEncoderForStruct(rt reflect.Type) (func(interface{}, io.Writer) error, error)

                                                              RowEncoderForStruct returns an encoding function that encodes a struct type or a pointer to a struct type using the beam row encoding.

                                                              Returns an error if the given type is invalid or not encodable to a beam schema row.

                                                              func Types

                                                              func Types(list []*Coder) []typex.FullType

                                                                Types returns a slice of types used by the supplied coders.

                                                                Types

                                                                type Coder

                                                                type Coder struct {
                                                                	Kind Kind
                                                                	T    typex.FullType
                                                                
                                                                	Components []*Coder     // WindowedValue, KV, CoGBK
                                                                	Custom     *CustomCoder // Custom
                                                                	Window     *WindowCoder // WindowedValue
                                                                
                                                                	ID string // (optional) This coder's ID if translated from a pipeline proto.
                                                                }

                                                                  Coder is a description of how to encode and decode values of a given type. Except for the "custom" kind, they are built in and must adhere to the (unwritten) Beam specification.

                                                                  func CoderFrom

                                                                  func CoderFrom(c *CustomCoder) *Coder

                                                                    CoderFrom is a helper that creates a Coder from a CustomCoder.

                                                                    func NewBool

                                                                    func NewBool() *Coder

                                                                      NewBool returns a new bool coder using the built-in scheme.

                                                                      func NewBytes

                                                                      func NewBytes() *Coder

                                                                        NewBytes returns a new []byte coder using the built-in scheme. It is always nested, for now.

                                                                        func NewCoGBK

                                                                        func NewCoGBK(components []*Coder) *Coder

                                                                          NewCoGBK returns a coder for CoGBK elements.

                                                                          func NewDouble

                                                                          func NewDouble() *Coder

                                                                            NewDouble returns a new double coder using the built-in scheme.

                                                                            func NewI

                                                                            func NewI(c *Coder) *Coder

                                                                              NewI returns an iterable coder in the form of a slice.

                                                                              func NewKV

                                                                              func NewKV(components []*Coder) *Coder

                                                                                NewKV returns a coder for key-value pairs.

                                                                                func NewPW

                                                                                func NewPW(c *Coder, w *WindowCoder) *Coder

                                                                                  NewPW returns a ParamWindowedValue coder for the window of elements.

                                                                                  func NewR

                                                                                  func NewR(t typex.FullType) *Coder

                                                                                    NewR returns a schema row coder for the type.

                                                                                    func NewString

                                                                                    func NewString() *Coder

                                                                                      NewString returns a new string coder using the built-in scheme.

                                                                                      func NewT

                                                                                      func NewT(c *Coder, w *WindowCoder) *Coder

                                                                                        NewT returns a timer coder for the window of elements.

                                                                                        func NewVarInt

                                                                                        func NewVarInt() *Coder

                                                                                          NewVarInt returns a new int64 coder using the built-in scheme.

                                                                                          func NewW

                                                                                          func NewW(c *Coder, w *WindowCoder) *Coder

                                                                                            NewW returns a WindowedValue coder for the window of elements.

                                                                                            func SkipW

                                                                                            func SkipW(c *Coder) *Coder

                                                                                              SkipW returns the data coder used by a WindowedValue, or returns the coder. This allows code to seamlessly traverse WindowedValues without additional conditional code.

                                                                                              func (*Coder) Equals

                                                                                              func (c *Coder) Equals(o *Coder) bool

                                                                                                Equals returns true iff the two coders are equal. It assumes that functions with the same name and types are identical.

                                                                                                func (*Coder) String

                                                                                                func (c *Coder) String() string

                                                                                                type CustomCoder

                                                                                                type CustomCoder struct {
                                                                                                	// Name is the coder name. Informational only.
                                                                                                	Name string
                                                                                                	// Type is the underlying concrete type that is being coded. It is
                                                                                                	// available to Enc and Dec. It must be a concrete type.
                                                                                                	Type reflect.Type
                                                                                                
                                                                                                	// Enc is the encoding function : T -> []byte. It may optionally take a
                                                                                                	// reflect.Type parameter and return an error as well.
                                                                                                	Enc *funcx.Fn
                                                                                                	// Dec is the decoding function: []byte -> T. It may optionally take a
                                                                                                	// reflect.Type parameter and return an error as well.
                                                                                                	Dec *funcx.Fn
                                                                                                
                                                                                                	ID string // (optional) This coder's ID if translated from a pipeline proto.
                                                                                                }

                                                                                                  CustomCoder contains possibly untyped encode/decode user functions that are type-bound at runtime. Universal coders can thus be used for many different types, but each CustomCoder instance will be bound to a specific type.

                                                                                                  func LookupCustomCoder

                                                                                                  func LookupCustomCoder(t reflect.Type) *CustomCoder

                                                                                                    LookupCustomCoder returns the custom coder for the type if any, first checking for a specific matching type, and then iterating through registered interface coders in reverse registration order.

                                                                                                    func NewCustomCoder

                                                                                                    func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error)

                                                                                                      NewCustomCoder creates a coder for the supplied parameters defining a particular encoding strategy.

                                                                                                      func (*CustomCoder) Equals

                                                                                                      func (c *CustomCoder) Equals(o *CustomCoder) bool

                                                                                                        Equals returns true iff the two custom coders are equal. It assumes that functions with the same name and types are identical.

                                                                                                        func (*CustomCoder) String

                                                                                                        func (c *CustomCoder) String() string

                                                                                                        type ElementDecoder

                                                                                                        type ElementDecoder interface {
                                                                                                        	Decode(r io.Reader) (interface{}, error)
                                                                                                        }

                                                                                                          ElementDecoder encapsulates being able to decode an element from a reader.

                                                                                                          type ElementEncoder

                                                                                                          type ElementEncoder interface {
                                                                                                          	Encode(element interface{}, w io.Writer) error
                                                                                                          }

                                                                                                            ElementEncoder encapsulates being able to encode an element into a writer.

                                                                                                            type Kind

                                                                                                            type Kind string

                                                                                                              Kind represents the type of coder used.

                                                                                                              const (
                                                                                                              	Custom             Kind = "Custom" // Implicitly length-prefixed
                                                                                                              	Bytes              Kind = "bytes"  // Implicitly length-prefixed as part of the encoding
                                                                                                              	String             Kind = "string" // Implicitly length-prefixed as part of the encoding.
                                                                                                              	Bool               Kind = "bool"
                                                                                                              	VarInt             Kind = "varint"
                                                                                                              	Double             Kind = "double"
                                                                                                              	Row                Kind = "R"
                                                                                                              	Timer              Kind = "T"
                                                                                                              	WindowedValue      Kind = "W"
                                                                                                              	ParamWindowedValue Kind = "PW"
                                                                                                              	Iterable           Kind = "I"
                                                                                                              	KV                 Kind = "KV"
                                                                                                              	LP                 Kind = "LP" // Explicitly length prefixed, likely at the runner's direction.
                                                                                                              
                                                                                                              	Window Kind = "window" // A debug wrapper around a window coder.
                                                                                                              
                                                                                                              	// CoGBK is currently equivalent to either
                                                                                                              	//
                                                                                                              	//     KV<X,Iterable<Y>>         (if GBK)
                                                                                                              	//     KV<X,Iterable<KV<int,Y>>> (if CoGBK, using a tagged union encoding)
                                                                                                              	//
                                                                                                              	// It requires special handling in translation to the model pipeline in the latter case
                                                                                                              	// to add the incoming index for each input.
                                                                                                              	//
                                                                                                              	// TODO(BEAM-490): once this JIRA is done, this coder should become the new thing.
                                                                                                              	CoGBK Kind = "CoGBK"
                                                                                                              )

                                                                                                                Tags for the various Beam encoding strategies. https://beam.apache.org/documentation/programming-guide/#coders documents the usage of coders in the Beam environment.

                                                                                                                type WindowCoder

                                                                                                                type WindowCoder struct {
                                                                                                                	Kind    WindowKind
                                                                                                                	Payload string // Payload is only populated for parameterized window coders.
                                                                                                                }

                                                                                                                  WindowCoder represents a Window coder.

                                                                                                                  func NewGlobalWindow

                                                                                                                  func NewGlobalWindow() *WindowCoder

                                                                                                                    NewGlobalWindow returns a window coder for the global window.

                                                                                                                    func NewIntervalWindow

                                                                                                                    func NewIntervalWindow() *WindowCoder

                                                                                                                      NewIntervalWindow returns a window coder for interval windows.

                                                                                                                      func (*WindowCoder) Equals

                                                                                                                      func (w *WindowCoder) Equals(o *WindowCoder) bool

                                                                                                                        Equals returns whether passed in WindowCoder has the same Kind and Payload as this WindowCoder.

                                                                                                                        func (*WindowCoder) String

                                                                                                                        func (w *WindowCoder) String() string

                                                                                                                        type WindowKind

                                                                                                                        type WindowKind string

                                                                                                                          WindowKind represents a kind of window coder.

                                                                                                                          const (
                                                                                                                          	GlobalWindow   WindowKind = "GWC"
                                                                                                                          	IntervalWindow WindowKind = "IWC"
                                                                                                                          )

                                                                                                                            Available window coders. The same coder could be used for more than one kind of windowing strategy.