xlangx

package
v2.41.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: Apache-2.0, BSD-3-Clause, MIT Imports: 22 Imported by: 0

Documentation

Overview

Package xlangx contains various low-level utilities needed for adding cross-language transforms to the pipeline.

Index

Constants

View Source
const (
	// Separator is the canonical separator between a namespace and optional configuration.
	Separator = ":"
	// ClasspathSeparator is the canonical separator between a classpath namespace config string from other namespace-configuration string.
	ClasspathSeparator = ";"
)

Variables

This section is empty.

Functions

func DecodeStructPayload added in v2.35.0

func DecodeStructPayload(plBytes []byte) (interface{}, error)

DecodeStructPayload takes a marshaled ExternalConfigurationPayload proto and returns a native Go struct, with its type converted from the Schema representation and its value decoded from the Row.

func EncodeStructPayload

func EncodeStructPayload(pl interface{}) ([]byte, error)

EncodeStructPayload takes a native Go struct and returns a marshaled ExternalConfigurationPayload proto, containing a Schema representation of the original type and the original value encoded as a Row. This is intended to be used as the expansion payload for an External transform.

func Expand

func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) error

Expand expands an unexpanded graph.ExternalTransform as a graph.ExpandedTransform and assigns it to the ExternalTransform's Expanded field. This requires querying an expansion service based on the configuration details within the ExternalTransform.

For framework use only. Users should call beam.CrossLanguage to access foreign transforms rather than calling this function directly.

func QueryAutomatedExpansionService added in v2.37.0

func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error)

QueryAutomatedExpansionService submits an external transform to be expanded by the expansion service and then eagerly materializes the artifacts for staging. The given transform should be the external transform, and the components are any additional components necessary for the pipeline snippet.

The address to be queried is determined by the Config field of the HandlerParams after the prefix tag indicating the automated service is in use.

func QueryExpansionService added in v2.35.0

func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error)

QueryExpansionService submits an external transform to be expanded by the expansion service. The given transform should be the external transform, and the components are any additional components necessary for the pipeline snippet.

The address to be queried is determined by the Config field of HandlerParams.

This HandlerFunc is exported to simplify building custom handler functions that do end up calling a Beam ExpansionService, either as a fallback or as part of normal flow.

func RegisterHandler added in v2.35.0

func RegisterHandler(namespace string, handler HandlerFunc)

RegisterHandler associates a namespace with a HandlerFunc which can be used to replace calls to a Beam ExpansionService.

Then, expansion addresses of the forms

"<namespace>" or
"<namespace>:<configuration>"

can be used with beam.CrossLanguage. Any configuration after the separator is provided to the HandlerFunc on call for the handler func to use at it's leisure.

func RegisterOverrideForUrn added in v2.35.0

func RegisterOverrideForUrn(urn, expansionAddr string)

RegisterOverrideForUrn overrides which expansion address is used to expand a specific transform URN. The expansion address must be a URL or be a namespaced handler registered with RegisterHandler.

When the expansion address is for a handler, it may take the forms

"<namespace>" or
"<namespace>:<configuration>"

func Require added in v2.35.0

func Require(expansionAddr string) string

Require takes an expansionAddr and requires cross language expansion to use it and it's associated handler. If the transform's urn has a specific override, it will be ignored.

Intended for use by cross language wrappers to permit per-call overrides of the expansion address within a single pipeline, such as for testing purposes.

func ResolveArtifacts

func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline)

ResolveArtifacts acquires all dependencies for a cross-language transform

func ResolveArtifactsWithConfig

func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error)

ResolveArtifactsWithConfig acquires all dependencies for cross-language transforms, but with some additional configuration to behavior. By default, this function performs the following steps for each cross-language transform in the list of edges:

  1. Retrieves a list of dependencies needed from the expansion service.
  2. Retrieves each dependency as an artifact and stages it to a default local filepath.
  3. Adds the dependencies to the transform's stored environment proto.

The changes that can be configured are documented in ResolveConfig.

This returns a map of "local path" to "sdk path". By default these are identical, unless ResolveConfig.SdkPath has been set.

func UpdateArtifactTypeFromFileToURL added in v2.39.0

func UpdateArtifactTypeFromFileToURL(edges []*graph.MultiEdge)

UpdateArtifactTypeFromFileToURL changes the type of the artifact from FILE to URL when the file path contains the suffix element ("://") of the URI scheme.

func UseAutomatedJavaExpansionService added in v2.37.0

func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServiceOption) string

UseAutomatedJavaExpansionService takes a gradle target and creates a tagged string to indicate that it should be used to start up an automated expansion service for a cross-language expansion.

Intended for use by cross language wrappers to permit spinning up an expansion service for a user if no expansion service address is provided.

Types

type ExpansionServiceOption added in v2.39.0

type ExpansionServiceOption func(*string)

ExpansionServiceOption provides an option for xlangx.UseAutomatedJavaExpansionService()

func AddClasspaths added in v2.39.0

func AddClasspaths(classpaths []string) ExpansionServiceOption

AddClasspaths is an expansion service option for xlangx.UseAutomatedExpansionService that accepts a classpaths slice and creates a tagged expansion address string suffixed with classpath separator and classpaths provided.

type HandlerFunc added in v2.35.0

type HandlerFunc func(context.Context, *HandlerParams) (*jobpb.ExpansionResponse, error)

HandlerFunc abstracts making an ExpansionService request.

type HandlerParams added in v2.35.0

type HandlerParams struct {
	// Additional parameterization string, if any.
	Config string

	Req *jobpb.ExpansionRequest
	// contains filtered or unexported fields
}

HandlerParams is the parameter to an expansion service handler.

func (*HandlerParams) CoderMarshaller added in v2.35.0

func (p *HandlerParams) CoderMarshaller() *graphx.CoderMarshaller

CoderMarshaller returns a coder marshaller initialized with the request's namespace.

func (*HandlerParams) Inputs added in v2.35.0

func (p *HandlerParams) Inputs() []PCol

Inputs returns the provided input PCollections, if any, for the PTransform to expand in this expansion service request.

func (*HandlerParams) Outputs added in v2.35.0

func (p *HandlerParams) Outputs() []PCol

Outputs returns the provided output PCollections, if any, for expected outputs for this expansion service request.

If no collections are returned, none are currently expected, but may be provided by the expansion.

type PCol added in v2.35.0

type PCol struct {
	Index   int          // Positional index of this input or output
	Local   string       // Local name of the PCollection (may be used in the cross language PTransform)
	Coder   *coder.Coder // Contains the full type and other coder information.
	Bounded pipepb.IsBounded_Enum
	// contains filtered or unexported fields
}

PCol represents input or output pcollections to the cross language transform being expanded.

func (*PCol) ID added in v2.35.0

func (p *PCol) ID() string

ID produces a standard format globally namespaced id for a PCollection from the local identifier.

func (*PCol) WSID added in v2.35.0

func (p *PCol) WSID() string

WSID produces a standard format globally namespaced id for a WindowingStrategy from the local identifier.

func (*PCol) WindowingStrategy added in v2.35.0

func (p *PCol) WindowingStrategy(cm *graphx.CoderMarshaller) (string, *pipepb.WindowingStrategy)

WindowingStrategy returns the id to this PCollection's windowing strategy, and the associated proto.

TODO: intern windowing strategies.

type ResolveConfig

type ResolveConfig struct {
	// SdkPath replaces the default filepath for dependencies, but only in the
	// external environment proto to be used by the SDK Harness during pipeline
	// execution. This is used to specify alternate staging directories, such
	// as for staging artifacts remotely.
	//
	// Setting an SdkPath does not change staging behavior otherwise. All
	// artifacts still get staged to the default local filepath, and it is the
	// user's responsibility to stage those local artifacts to the SdkPath.
	SdkPath string

	// JoinFn is a function for combining SdkPath and individual artifact names.
	// If not specified, it defaults to using filepath.Join.
	JoinFn func(path, name string) string
}

ResolveConfig contains fields for configuring the behavior for resolving artifacts.

Directories

Path Synopsis
Package expansionx contains utilities for starting expansion services for cross-language transforms.
Package expansionx contains utilities for starting expansion services for cross-language transforms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL