Documentation

Overview

    Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker.

    The registry's Register function is used by transform authors to make their type's methods available for remote invocation. The runner then uses the registry's Key and Lookup methods to access information supplied by transform authors.

    The Encode* and Decode* methods are used to handle serialization of both regular Go data and the specific Beam data types. The Encode* methods are used after pipeline construction to turn the plan into a serializable form that can be sent for remote execution. The Decode* methods are used by the runner to recover the execution plan from the serialized form.

    Index

    Constants

    View Source
    const (
    	URNInject = "beam:go:transform:inject:v1"
    	URNExpand = "beam:go:transform:expand:v1"
    )
    View Source
    const (
    	URNImpulse       = "beam:transform:impulse:v1"
    	URNParDo         = "beam:transform:pardo:v1"
    	URNFlatten       = "beam:transform:flatten:v1"
    	URNGBK           = "beam:transform:group_by_key:v1"
    	URNReshuffle     = "beam:transform:reshuffle:v1"
    	URNCombinePerKey = "beam:transform:combine_per_key:v1"
    	URNWindow        = "beam:transform:window:v1"
    
    	// URNIterableSideInput = "beam:side_input:iterable:v1"
    	URNMultimapSideInput = "beam:side_input:multimap:v1"
    
    	URNGlobalWindowsWindowFn  = "beam:window_fn:global_windows:v1"
    	URNFixedWindowsWindowFn   = "beam:window_fn:fixed_windows:v1"
    	URNSlidingWindowsWindowFn = "beam:window_fn:sliding_windows:v1"
    	URNSessionsWindowFn       = "beam:window_fn:session_windows:v1"
    
    	// SDK constants
    	URNDoFn = "beam:go:transform:dofn:v1"
    
    	URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1"
    	URNReshuffleInput       = "beam:go:transform:reshuffleinput:v1"
    	URNReshuffleOutput      = "beam:go:transform:reshuffleoutput:v1"
    
    	URNLegacyProgressReporting = "beam:protocol:progress_reporting:v0"
    	URNMultiCore               = "beam:protocol:multi_core_bundle_processing:v1"
    
    	URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"
    
    	URNArtifactGoWorker  = "beam:artifact:type:go_worker_binary:v1"
    	URNArtifactStagingTo = "beam:artifact:role:staging_to:v1"
    )

      Model constants for interfacing with a Beam runner. TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto

      Variables

      This section is empty.

      Functions

      func AddFakeImpulses

      func AddFakeImpulses(p *pipepb.Pipeline)

        AddFakeImpulses adds an impulse transform as the producer for each input to the root transform. Inputs need producers to form a correct pipeline.

        func CreateEnvironment

        func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error)

          CreateEnvironment produces the appropriate payload for the type of environment.

          func DecodeCoder

          func DecodeCoder(data string) (*coder.Coder, error)

            DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

            func DecodeCoderRef

            func DecodeCoderRef(c *CoderRef) (*coder.Coder, error)

              DecodeCoderRef extracts a usable coder from the encoded runner form.

              func DecodeCoderRefs

              func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error)

                DecodeCoderRefs extracts usable coders from the encoded runner form.

                func DecodeFn

                func DecodeFn(data string) (reflectx.Func, error)

                  DecodeFn encodes a function. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The parameter types must be encodable.

                  func DecodeMultiEdge

                  func DecodeMultiEdge(edge *v1pb.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error)

                    DecodeMultiEdge converts the wire representation into the preprocessed components representing that edge. We deserialize to components to avoid inserting the edge into a graph or creating a detached edge.

                    func DecodeType

                    func DecodeType(data string) (reflect.Type, error)

                      DecodeType decodes a type. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

                      func EncodeCoder

                      func EncodeCoder(c *coder.Coder) (string, error)

                        EncodeCoder encodes a coder as a string. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

                        func EncodeFn

                        func EncodeFn(fn reflectx.Func) (string, error)

                          EncodeFn encodes a function and parameter types as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

                          func EncodeMultiEdge

                          func EncodeMultiEdge(edge *graph.MultiEdge) (*v1pb.MultiEdge, error)

                            EncodeMultiEdge converts the preprocessed representation into the wire representation of the multiedge, capturing input and output type information.

                            func EncodeType

                            func EncodeType(t reflect.Type) (string, error)

                              EncodeType encodes a type as a string. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.

                              func ExpandedComponents

                              func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error)

                                ExpandedComponents type asserts the Components field with interface{} type and returns its pipeline component proto representation

                                func ExpandedTransform

                                func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, error)

                                  ExpandedTransform type asserts the Transform field with interface{} type and returns its pipeline ptransform proto representation

                                  func ExternalInputs

                                  func ExternalInputs(e *graph.MultiEdge) map[string]*graph.Node

                                    ExternalInputs returns the map (tag -> graph node representing the pcollection) of input nodes with respect to the map (tag -> index of Inbound in MultiEdge.Input) of named inputs

                                    func ExternalOutputs

                                    func ExternalOutputs(e *graph.MultiEdge) map[string]*graph.Node

                                      ExternalOutputs returns the map (tag -> graph node representing the pcollection) of output nodes with respect to the map (tag -> index of Outbound in MultiEdge.Output) of named outputs

                                      func MakeGBKUnionCoder

                                      func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error)

                                        MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.

                                        func MakeKVUnionCoder

                                        func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error)

                                          MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.

                                          func Marshal

                                          func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error)

                                            Marshal converts a graph to a model pipeline.

                                            func MarshalCoders

                                            func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder, error)

                                              MarshalCoders marshals a list of coders into model coders.

                                              func RemoveFakeImpulses

                                              func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform)

                                                RemoveFakeImpulses removes each fake impulse per input to the the transform. Multiple producers for one Input cannot be present.

                                                func ResolveOutputIsBounded

                                                func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool))

                                                  ResolveOutputIsBounded updates each Output node with respect to the received expanded components to reflect if it is bounded or not

                                                  func UnmarshalCoders

                                                  func UnmarshalCoders(ids []string, m map[string]*pipepb.Coder) ([]*coder.Coder, error)

                                                    UnmarshalCoders unmarshals coders.

                                                    func VerifyNamedOutputs

                                                    func VerifyNamedOutputs(ext *graph.ExternalTransform)

                                                      VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs

                                                      Types

                                                      type CoderMarshaller

                                                      type CoderMarshaller struct {
                                                      	// contains filtered or unexported fields
                                                      }

                                                        CoderMarshaller incrementally builds a compact model representation of a set of coders. Identical coders are shared.

                                                        func NewCoderMarshaller

                                                        func NewCoderMarshaller() *CoderMarshaller

                                                          NewCoderMarshaller returns a new CoderMarshaller.

                                                          func (*CoderMarshaller) Add

                                                          func (b *CoderMarshaller) Add(c *coder.Coder) (string, error)

                                                            Add adds the given coder to the set and returns its id. Idempotent.

                                                            func (*CoderMarshaller) AddMulti

                                                            func (b *CoderMarshaller) AddMulti(list []*coder.Coder) ([]string, error)

                                                              AddMulti adds the given coders to the set and returns their ids. Idempotent.

                                                              func (*CoderMarshaller) AddWindowCoder

                                                              func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) (string, error)

                                                                AddWindowCoder adds a window coder.

                                                                func (*CoderMarshaller) Build

                                                                func (b *CoderMarshaller) Build() map[string]*pipepb.Coder

                                                                  Build returns the set of model coders. Note that the map may be larger than the number of coders added, because component coders are included.

                                                                  type CoderRef

                                                                  type CoderRef struct {
                                                                  	Type                 string      `json:"@type,omitempty"`
                                                                  	Components           []*CoderRef `json:"component_encodings,omitempty"`
                                                                  	IsWrapper            bool        `json:"is_wrapper,omitempty"`
                                                                  	IsPairLike           bool        `json:"is_pair_like,omitempty"`
                                                                  	IsStreamLike         bool        `json:"is_stream_like,omitempty"`
                                                                  	PipelineProtoCoderID string      `json:"pipeline_proto_coder_id,omitempty"`
                                                                  }

                                                                    CoderRef defines the (structured) Coder in serializable form. It is an artifact of the CloudObject encoding.

                                                                    func EncodeCoderRef

                                                                    func EncodeCoderRef(c *coder.Coder) (*CoderRef, error)

                                                                      EncodeCoderRef returns the encoded form understood by the runner.

                                                                      func EncodeCoderRefs

                                                                      func EncodeCoderRefs(list []*coder.Coder) ([]*CoderRef, error)

                                                                        EncodeCoderRefs returns the encoded forms understood by the runner.

                                                                        func WrapIterable

                                                                        func WrapIterable(c *CoderRef) *CoderRef

                                                                          WrapIterable adds an iterable (stream) coder for Dataflow side input.

                                                                          func WrapWindowed

                                                                          func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) (*CoderRef, error)

                                                                            WrapWindowed adds a windowed coder for Dataflow collections.

                                                                            type CoderUnmarshaller

                                                                            type CoderUnmarshaller struct {
                                                                            	// contains filtered or unexported fields
                                                                            }

                                                                              CoderUnmarshaller is an incremental unmarshaller of model coders. Identical coders are shared.

                                                                              func NewCoderUnmarshaller

                                                                              func NewCoderUnmarshaller(m map[string]*pipepb.Coder) *CoderUnmarshaller

                                                                                NewCoderUnmarshaller returns a new CoderUnmarshaller.

                                                                                func (*CoderUnmarshaller) Coder

                                                                                func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error)

                                                                                  Coder unmarshals a coder with the given id.

                                                                                  func (*CoderUnmarshaller) Coders

                                                                                  func (b *CoderUnmarshaller) Coders(ids []string) ([]*coder.Coder, error)

                                                                                    Coders unmarshals a list of coder ids.

                                                                                    func (*CoderUnmarshaller) WindowCoder

                                                                                    func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error)

                                                                                      WindowCoder unmarshals a window coder with the given id.

                                                                                      type NamedEdge

                                                                                      type NamedEdge struct {
                                                                                      	Name string
                                                                                      	Edge *graph.MultiEdge
                                                                                      }

                                                                                        NamedEdge is a named MultiEdge.

                                                                                        type NamedScope

                                                                                        type NamedScope struct {
                                                                                        	Name  string
                                                                                        	Scope *graph.Scope
                                                                                        }

                                                                                          NamedScope is a named Scope.

                                                                                          type Options

                                                                                          type Options struct {
                                                                                          	// Environment used to run the user code.
                                                                                          	Environment *pipepb.Environment
                                                                                          }

                                                                                            Options for marshalling a graph into a model pipeline.

                                                                                            type ScopeTree

                                                                                            type ScopeTree struct {
                                                                                            	// Scope is the named scope at the root of the (sub)tree.
                                                                                            	Scope NamedScope
                                                                                            	// Edges are the named edges directly under this scope.
                                                                                            	Edges []NamedEdge
                                                                                            
                                                                                            	// Children are the scopes directly under this scope.
                                                                                            	Children []*ScopeTree
                                                                                            }

                                                                                              ScopeTree is a convenient representation of the Scope-structure as a tree. Each ScopeTree may also be a subtree of another ScopeTree. The tree structure respects the underlying Scope structure, i.e., if Scope 'a' has a parent 'b' then the ScopeTree for 'b' must have the ScopeTree for 'a' as a child.

                                                                                              func NewScopeTree

                                                                                              func NewScopeTree(edges []*graph.MultiEdge) *ScopeTree

                                                                                                NewScopeTree computes the ScopeTree for a set of edges.

                                                                                                Directories

                                                                                                Path Synopsis
                                                                                                Package schema contains utility functions for relating Go types and Beam Schemas.
                                                                                                Package schema contains utility functions for relating Go types and Beam Schemas.
                                                                                                Package v1 is a generated protocol buffer package.
                                                                                                Package v1 is a generated protocol buffer package.