integration

package
Version: v2.39.0-RC1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2022 License: Apache-2.0, BSD-3-Clause, MIT Imports: 11 Imported by: 0

Documentation

Overview

Package integration provides functionality that needs to be shared between all integration tests.

Integration tests are implemented through Go's test framework, as test functions that create and execute pipelines using the ptest package. Tests should be placed in smaller sub-packages for organizational purposes and parallelism (tests are only run in parallel across different packages). Integration tests should always begin with a call to CheckFilters to ensure test filters can be applied, and each package containing integration tests should call ptest.Main in a TestMain function if it uses ptest.

Running integration tests can be done with a go test call with any flags that are required by the test pipelines, such as --runner or --endpoint. Example:

go test -v ./sdks/go/test/integration/... --runner=portable --endpoint=localhost:8099

Alternatively, tests can be executed by running the run_validatesrunner_tests.sh script, which also performs much of the environment setup, or by calling gradle commands in :sdks:go:test.

Index

Constants

This section is empty.

Variables

View Source
var (
	// BootstrapServers is the address of the bootstrap servers for a Kafka
	// cluster, used for Kafka IO tests.
	BootstrapServers = flag.String("bootstrap_servers", "",
		"URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")

	// KafkaJar is a filepath to a jar for starting a Kafka cluster, used for
	// Kafka IO tests.
	KafkaJar = flag.String("kafka_jar", "",
		"The filepath to a jar for starting up a Kafka cluster. Only used if boostrap_servers is unspecified.")

	// KafkaJarTimeout attempts to apply an auto-shutdown timeout to the Kafka
	// cluster jar. Only used for Kafka IO tests.
	KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m",
		"Sets an auto-shutdown timeout to the Kafka cluster. "+
			"Requires the timeout command to be present in Path, unless the value is set to \"\".")

	// ExpansionJars contains elements in the form "label:jar" describing jar
	// filepaths for expansion services to use in integration tests, and the
	// corresponding labels. Once provided through this flag, those jars can
	// be used in tests via the ExpansionServices struct.
	ExpansionJars stringSlice

	// ExpansionAddrs contains elements in the form "label:address" describing
	// endpoints for expansion services to use in integration tests, and the
	// corresponding labels. Once provided through this flag, those addresses
	// can be used in tests via the ExpansionServices struct.
	ExpansionAddrs stringSlice

	// ExpansionTimeout attempts to apply an auto-shutdown timeout to any
	// expansion services started by integration tests.
	ExpansionTimeout = flag.Duration("expansion_timeout", 0,
		"Sets an auto-shutdown timeout to any started expansion services. "+
			"Requires the timeout command to be present in Path, unless the value is set to 0.")
)

The following flags are flags used in one or more integration tests, and that may be used by scripts that execute "go test ./sdks/go/test/integration/...". Because any flags used with those commands are used for each package, every integration test package must import these flags, even if they are not used.

Functions

func CheckFilters

func CheckFilters(t *testing.T)

CheckFilters checks if an integration test is filtered to be skipped, either because the intended runner does not support it, or the test is sickbayed. This method should be called at the beginning of any integration test. If t.Run is used, CheckFilters should be called within the t.Run callback, so that sub-tests can be skipped individually.

func GetExpansionAddrs added in v2.38.0

func GetExpansionAddrs() map[string]string

GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address.

func GetExpansionJars added in v2.38.0

func GetExpansionJars() map[string]string

GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location.

Types

type ExpansionServices added in v2.38.0

type ExpansionServices struct {
	// contains filtered or unexported fields
}

ExpansionServices is a struct used for getting addresses and starting expansion services, based on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this instead of accessing the flags directly is to let it handle jar startup and shutdown.

Usage

Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or simply defer a call to it).

ExpansionServices is not concurrency safe, and so a single instance should not be used within multiple individual tests, due to the possibility of those tests being run concurrently. It is recommended to only use ExpansionServices in TestMain to avoid this.

Example:

flag.Parse()
beam.Init()
services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("example")
if err != nil {
  panic(err)
}
expansionAddr = addr  // Save address to a package-level variable used by tests.
ptest.MainRet(m)

func NewExpansionServices added in v2.38.0

func NewExpansionServices() *ExpansionServices

NewExpansionServices creates and initializes an ExpansionServices instance.

func (*ExpansionServices) GetAddr added in v2.38.0

func (es *ExpansionServices) GetAddr(label string) (string, error)

GetAddr gets the address for the expansion service with the given label. The label corresponds to the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is provided as a jar, then that jar will be run to retrieve the address, and the jars are not guaranteed to be shut down unless Shutdown is called.

Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use this function if the possibility of a few seconds of latency is not acceptable.

func (*ExpansionServices) Shutdown added in v2.38.0

func (es *ExpansionServices) Shutdown()

Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it was used at all.

Directories

Path Synopsis
The integration driver provides a suite of tests to run against a registered runner.
The integration driver provides a suite of tests to run against a registered runner.
io
xlang/debezium
Package debezium contains integration tests for cross-language Debezium IO transforms.
Package debezium contains integration tests for cross-language Debezium IO transforms.
xlang/jdbc
Package jdbc contains integration tests for cross-language JDBC IO transforms.
Package jdbc contains integration tests for cross-language JDBC IO transforms.
xlang/kafka
Package kafka contains integration tests for cross-language Kafka IO transforms.
Package kafka contains integration tests for cross-language Kafka IO transforms.
Package primitives contains integration tests for primitives in beam.
Package primitives contains integration tests for primitives in beam.
Package synthetic contains pipelines for testing synthetic steps and sources.
Package synthetic contains pipelines for testing synthetic steps and sources.
Package wordcount contains transforms for wordcount.
Package wordcount contains transforms for wordcount.
Package xlang contains integration tests for cross-language transforms.
Package xlang contains integration tests for cross-language transforms.
internal
jars
Package jars contains functionality for running jars for integration tests.
Package jars contains functionality for running jars for integration tests.
ports
Package ports contains utilities for handling ports needed for integration tests.
Package ports contains utilities for handling ports needed for integration tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL