v2.31.2+incompatible Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2021 License: Apache-2.0, BSD-3-Clause, MIT Imports: 10 Imported by: 0



taxi is an example using a cross-language Kafka pipeline to write and read to Kafka. This example reads from the PubSub NYC Taxi stream described in https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a given Kafka topic and then reads back from the same Kafka topic, logging every element. This is done as a streaming pipeline and will not end unless the pipeline is stopped externally.

Running this example requires a Kafka cluster accessible to the runner, and a cross-language expansion service that can expand Kafka read and write transforms.

Setting Up a Kafka Cluster

Setting up a Kafka cluster is more involved than can be covered in this example. In order for this example to work, all that is necessary is a Kafka cluster accessible through a bootstrap server address that is passed in as a flag. Some instructions for setting up a single node Kafka cluster in GCE can be found here: https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery

Running an Expansion Server

These instructions will cover running the Java IO Expansion Service, and therefore requires a JDK installation in a version supported by Beam. Depending on whether you are running this from a numbered Beam release, or a development environment, there are two sources you may use for the Expansion service.

Numbered release: The expansion service jar is vendored as module

org.apache.beam:beam-sdks-java-io-expansion-service in Maven Repository.
This jar can be executed directly with the following command:
`java -jar <jar_name> <port_number>`

Development env: This requires that the JAVA_HOME environment variable

points to your JDK installation. From the root `beam/` directory of the
Apache Beam repository, the jar can be built (or built and run) with the
following commands:
  Build: ./gradlew :sdks:java:io:expansion-service:build
  Build and Run: ./gradlew :sdks:java:io:expansion-service:runExpansionService -PconstructionService.port=<port_num>

Running the Example on GCP

Running this pipeline requires providing an address for the Expansion Service and for the Kafka cluster's bootstrap servers as flags, in addition to the usual flags for pipelines.

An example command for executing this pipeline on GCP is as follows:

export PROJECT="$(gcloud config get-value project)"
export TEMP_LOCATION="gs://MY-BUCKET/temp"
export REGION="us-central1"
export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
export EXPANSION_ADDR="localhost:1234"
go run ./sdks/go/examples/kafka/types/types.go \
  --runner=DataflowRunner \
  --temp_location=$TEMP_LOCATION \
  --staging_location=$STAGING_LOCATION \
  --project=$PROJECT \
  --region=$REGION \
  --job_name="${JOB_NAME}" \
  --bootstrap_servers=$BOOTSTRAP_SERVER \
  --experiments=use_portable_job_submission,use_runner_v2 \

Running the Example From a Git Clone

When running on a development environment, a custom container will likely need to be provided for the cross-language SDK. First this will require building and pushing the SDK container to container repository, such as Docker Hub.

export DOCKER_ROOT="Your Docker Repository Root"
./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
docker push $DOCKER_ROOT/beam_java8_sdk:latest

For runners in local mode, simply building the container using the default values for docker-repository-root and docker-tag will work to have it accessible locally.

Additionally, you must provide the location of your custom container to the pipeline with the --sdk_harness_container_image_override flag. For example:


Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL