Documentation ¶
Overview ¶
Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker.
The registry's Register function is used by transform authors to make their type's methods available for remote invocation. The runner then uses the registry's Key and Lookup methods to access information supplied by transform authors.
The Encode* and Decode* methods are used to handle serialization of both regular Go data and the specific Beam data types. The Encode* methods are used after pipeline construction to turn the plan into a serializable form that can be sent for remote execution. The Decode* methods are used by the runner to recover the execution plan from the serialized form.
Index ¶
- Constants
- func AddFakeImpulses(p *pipepb.Pipeline)
- func CreateEnvironment(ctx context.Context, urn string, ...) (*pipepb.Environment, error)
- func DecodeCoder(data string) (*coder.Coder, error)
- func DecodeCoderRef(c *CoderRef) (*coder.Coder, error)
- func DecodeCoderRefs(list []*CoderRef) ([]*coder.Coder, error)
- func DecodeFn(data string) (reflectx.Func, error)
- func DecodeMultiEdge(edge *v1pb.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, ...)
- func DecodeType(data string) (reflect.Type, error)
- func EncodeCoder(c *coder.Coder) (string, error)
- func EncodeFn(fn reflectx.Func) (string, error)
- func EncodeMultiEdge(edge *graph.MultiEdge) (*v1pb.MultiEdge, error)
- func EncodeType(t reflect.Type) (string, error)
- func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error)
- func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, error)
- func ExternalInputs(e *graph.MultiEdge) map[string]*graph.Node
- func ExternalOutputs(e *graph.MultiEdge) map[string]*graph.Node
- func InboundTagToNode(inputsMap map[string]int, inbound []*graph.Inbound) map[string]*graph.Node
- func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error)
- func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error)
- func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error)
- func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder, error)
- func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (*pipepb.WindowingStrategy, error)
- func OutboundTagToNode(outputsMap map[string]int, outbound []*graph.Outbound) map[string]*graph.Node
- func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform)
- func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool))
- func UnmarshalCoders(ids []string, m map[string]*pipepb.Coder) ([]*coder.Coder, error)
- func UpdateDefaultEnvWorkerType(typeUrn string, pyld []byte, p *pipepb.Pipeline) error
- func UserStateCoderID(ps state.PipelineState) string
- func UserStateKeyCoderID(ps state.PipelineState) string
- func VerifyNamedOutputs(ext *graph.ExternalTransform)
- type CoderMarshaller
- type CoderRef
- type CoderUnmarshaller
- type NamedEdge
- type NamedScope
- type Options
- type ScopeTree
Constants ¶
const ( URNInject = "beam:go:transform:inject:v1" URNExpand = "beam:go:transform:expand:v1" )
const ( URNImpulse = "beam:transform:impulse:v1" URNParDo = "beam:transform:pardo:v1" URNFlatten = "beam:transform:flatten:v1" URNGBK = "beam:transform:group_by_key:v1" URNReshuffle = "beam:transform:reshuffle:v1" URNCombinePerKey = "beam:transform:combine_per_key:v1" URNWindow = "beam:transform:window_into:v1" URNMapWindows = "beam:transform:map_windows:v1" URNToString = "beam:transform:to_string:v1" URNIterableSideInput = "beam:side_input:iterable:v1" URNMultimapSideInput = "beam:side_input:multimap:v1" URNGlobalWindowsWindowFn = "beam:window_fn:global_windows:v1" URNFixedWindowsWindowFn = "beam:window_fn:fixed_windows:v1" URNSlidingWindowsWindowFn = "beam:window_fn:sliding_windows:v1" URNSessionsWindowFn = "beam:window_fn:session_windows:v1" // SDK constants URNDoFn = "beam:go:transform:dofn:v1" URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1" URNReshuffleInput = "beam:go:transform:reshuffleinput:v1" URNReshuffleOutput = "beam:go:transform:reshuffleoutput:v1" URNWindowMappingGlobal = "beam:go:windowmapping:global:v1" URNWindowMappingFixed = "beam:go:windowmapping:fixed:v1" URNWindowMappingSliding = "beam:go:windowmapping:sliding:v1" URNProgressReporting = "beam:protocol:progress_reporting:v1" URNMultiCore = "beam:protocol:multi_core_bundle_processing:v1" URNWorkerStatus = "beam:protocol:worker_status:v1" URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" URNDataSampling = "beam:protocol:data_sampling:v1" URNSDKConsumingReceivedData = "beam:protocol:sdk_consuming_received_data:v1" URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1" URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1" URNRequiresStatefulProcessing = "beam:requirement:pardo:stateful:v1" URNTruncate = "beam:transform:sdf_truncate_sized_restrictions:v1" // Deprecated: Determine worker binary based on GoWorkerBinary Role instead. URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1" URNArtifactFileType = "beam:artifact:type:file:v1" URNArtifactURLType = "beam:artifact:type:url:v1" URNArtifactGoWorkerRole = "beam:artifact:role:go_worker_binary:v1" // Environment URNs. URNEnvProcess = "beam:env:process:v1" URNEnvExternal = "beam:env:external:v1" URNEnvDocker = "beam:env:docker:v1" // Userstate URNs. URNBagUserState = "beam:user_state:bag:v1" URNMultiMapUserState = "beam:user_state:multimap:v1" // Base version URNs are to allow runners to make distinctions between different releases // in a way that won't change based on actual releases, in particular for FnAPI behaviors. URNBaseVersionGo = "beam:version:sdk_base:go:" + core.DefaultDockerImage )
Model constants for interfacing with a Beam runner.
Variables ¶
This section is empty.
Functions ¶
func AddFakeImpulses ¶
AddFakeImpulses adds an impulse transform as the producer for each input to the root transform. Inputs need producers to form a correct pipeline.
func CreateEnvironment ¶
func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error)
CreateEnvironment produces the appropriate payload for the type of environment.
func DecodeCoder ¶
DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.
func DecodeCoderRef ¶
DecodeCoderRef extracts a usable coder from the encoded runner form.
func DecodeCoderRefs ¶
DecodeCoderRefs extracts usable coders from the encoded runner form.
func DecodeFn ¶
DecodeFn encodes a function. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The parameter types must be encodable.
func DecodeMultiEdge ¶
func DecodeMultiEdge(edge *v1pb.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error)
DecodeMultiEdge converts the wire representation into the preprocessed components representing that edge. We deserialize to components to avoid inserting the edge into a graph or creating a detached edge.
func DecodeType ¶
DecodeType decodes a type. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.
func EncodeCoder ¶
EncodeCoder encodes a coder as a string. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.
func EncodeFn ¶
EncodeFn encodes a function and parameter types as a string. The function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.
func EncodeMultiEdge ¶
EncodeMultiEdge converts the preprocessed representation into the wire representation of the multiedge, capturing input and output type information.
func EncodeType ¶
EncodeType encodes a type as a string. Unless registered, the decoded type is only guaranteed to be isomorphic to the input with regard to data members. The returned type will have no methods.
func ExpandedComponents ¶
func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error)
ExpandedComponents type asserts the Components field with any type and returns its pipeline component proto representation
func ExpandedTransform ¶
func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, error)
ExpandedTransform type asserts the Transform field with any type and returns its pipeline ptransform proto representation
func ExternalInputs ¶
ExternalInputs returns the map (tag -> graph node representing the pcollection) of input nodes with respect to the map (tag -> index of Inbound in MultiEdge.Input) of named inputs
func ExternalOutputs ¶
ExternalOutputs returns the map (tag -> graph node representing the pcollection) of output nodes with respect to the map (tag -> index of Outbound in MultiEdge.Output) of named outputs
func InboundTagToNode ¶
InboundTagToNode relates the tags from inbound links to their respective nodes.
func MakeGBKUnionCoder ¶
MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
func MakeKVUnionCoder ¶
MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
func MarshalCoders ¶
MarshalCoders marshals a list of coders into model coders.
func MarshalWindowingStrategy ¶ added in v2.35.0
func MarshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (*pipepb.WindowingStrategy, error)
MarshalWindowingStrategy marshals the given windowing strategy in the given coder context.
func OutboundTagToNode ¶
func OutboundTagToNode(outputsMap map[string]int, outbound []*graph.Outbound) map[string]*graph.Node
OutboundTagToNode relates the tags from outbound links to their respective nodes.
func RemoveFakeImpulses ¶
func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform)
RemoveFakeImpulses removes each fake impulse per input to the transform. Multiple producers for one Input cannot be present.
func ResolveOutputIsBounded ¶
ResolveOutputIsBounded updates each Output node with respect to the received expanded components to reflect if it is bounded or not
func UnmarshalCoders ¶
UnmarshalCoders unmarshals coders.
func UpdateDefaultEnvWorkerType ¶ added in v2.37.0
UpdateDefaultEnvWorkerType is so runners can update the pipeline's default environment with the correct artifact type and payload for the Go worker binary.
func UserStateCoderID ¶ added in v2.42.0
func UserStateCoderID(ps state.PipelineState) string
UserStateCoderID returns the coder id of a user state
func UserStateKeyCoderID ¶ added in v2.42.0
func UserStateKeyCoderID(ps state.PipelineState) string
UserStateKeyCoderID returns the key coder id of a user state
func VerifyNamedOutputs ¶
func VerifyNamedOutputs(ext *graph.ExternalTransform)
VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs
Types ¶
type CoderMarshaller ¶
type CoderMarshaller struct { Namespace string // Namespace for xlang coders. // contains filtered or unexported fields }
CoderMarshaller incrementally builds a compact model representation of a set of coders. Identical coders are shared.
func NewCoderMarshaller ¶
func NewCoderMarshaller() *CoderMarshaller
NewCoderMarshaller returns a new CoderMarshaller.
func (*CoderMarshaller) Add ¶
func (b *CoderMarshaller) Add(c *coder.Coder) (string, error)
Add adds the given coder to the set and returns its id. Idempotent.
func (*CoderMarshaller) AddMulti ¶
func (b *CoderMarshaller) AddMulti(list []*coder.Coder) ([]string, error)
AddMulti adds the given coders to the set and returns their ids. Idempotent.
func (*CoderMarshaller) AddWindowCoder ¶
func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) (string, error)
AddWindowCoder adds a window coder.
type CoderRef ¶
type CoderRef struct { Type string `json:"@type,omitempty"` Components []*CoderRef `json:"component_encodings,omitempty"` IsWrapper bool `json:"is_wrapper,omitempty"` IsPairLike bool `json:"is_pair_like,omitempty"` IsStreamLike bool `json:"is_stream_like,omitempty"` PipelineProtoCoderID string `json:"pipeline_proto_coder_id,omitempty"` }
CoderRef defines the (structured) Coder in serializable form. It is an artifact of the CloudObject encoding.
func EncodeCoderRef ¶
EncodeCoderRef returns the encoded form understood by the runner.
func EncodeCoderRefs ¶
EncodeCoderRefs returns the encoded forms understood by the runner.
func WrapIterable ¶
WrapIterable adds an iterable (stream) coder for Dataflow side input.
func WrapWindowed ¶
func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) (*CoderRef, error)
WrapWindowed adds a windowed coder for Dataflow collections.
type CoderUnmarshaller ¶
type CoderUnmarshaller struct {
// contains filtered or unexported fields
}
CoderUnmarshaller is an incremental unmarshaller of model coders. Identical coders are shared.
func NewCoderUnmarshaller ¶
func NewCoderUnmarshaller(m map[string]*pipepb.Coder) *CoderUnmarshaller
NewCoderUnmarshaller returns a new CoderUnmarshaller.
func (*CoderUnmarshaller) Coder ¶
func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error)
Coder unmarshals a coder with the given id.
func (*CoderUnmarshaller) Coders ¶
func (b *CoderUnmarshaller) Coders(ids []string) ([]*coder.Coder, error)
Coders unmarshals a list of coder ids.
func (*CoderUnmarshaller) WindowCoder ¶
func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error)
WindowCoder unmarshals a window coder with the given id.
type NamedScope ¶
NamedScope is a named Scope.
type Options ¶
type Options struct { // Environment used to run the user code. Environment *pipepb.Environment // PipelineResourceHints for setting defaults across the whole pipeline. PipelineResourceHints resource.Hints // ContextReg is an override for the context extractor registry for testing. ContextReg *contextreg.Registry }
Options for marshalling a graph into a model pipeline.
func (*Options) GetContextReg ¶ added in v2.53.0
func (opts *Options) GetContextReg() *contextreg.Registry
GetContextReg returns the default context registry if the option is unset, and the field version otherwise.
type ScopeTree ¶
type ScopeTree struct { // Scope is the named scope at the root of the (sub)tree. Scope NamedScope // Edges are the named edges directly under this scope. Edges []NamedEdge // Children are the scopes directly under this scope. Children []*ScopeTree }
ScopeTree is a convenient representation of the Scope-structure as a tree. Each ScopeTree may also be a subtree of another ScopeTree. The tree structure respects the underlying Scope structure, i.e., if Scope 'a' has a parent 'b' then the ScopeTree for 'b' must have the ScopeTree for 'a' as a child.
func NewScopeTree ¶
NewScopeTree computes the ScopeTree for a set of edges.