pbeam

package
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: Apache-2.0, BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Overview

Package pbeam provides an API for building differentially private data processing pipelines using Apache Beam (https://beam.apache.org) with its Go SDK (https://godoc.org/github.com/apache/beam/sdks/v2/go/pkg/beam).

It introduces the concept of a PrivatePCollection, an interface mirroring Apache Beam's PCollection concept. PrivatePCollection implements additional restrictions and aggregations to facilitate differentially private analysis. This API is meant to be used by developers without differential privacy expertise.

For a step-by-step introduction to differential privacy, Apache Beam, and example usage of this library, see: https://codelabs.developers.google.com/codelabs/privacy-on-beam/index.html; a codelab meant for developers who want to get started on using this library and generating differentially private metrics.

The rest of this package-level comment goes into more detail about the precise guarantees offered by this API, and assumes some familiarity with the Apache Beam model, its Go SDK, and differential privacy.

To understand the main API contract provided by PrivatePCollection, consider the following example pipeline.

	p := beam.NewPipeline()
	s := p.Root()
	// The input is a series of files in which each line contains the data of a privacy unit (e.g. an individual).
	input := textio.Read(s, "/path/to/files/*.txt") // input is a PCollection<string>
	// Extracts the privacy ID and the data associated with each line: extractID is a func(string) (userID,data).
	icol := beam.ParDo(s, input, extractID) // icol is a PCollection<privacyUnitID,data>
	// Transforms the input PCollection into a PrivatePCollection with parameters ε=1 and δ=10⁻¹⁰.
	// The privacy ID is "hidden" by the operation: pcol behaves as if it were a PCollection<data>.
  spec, err := pbeam.NewPrivacySpec(pbeam.PrivacySpecParams{
    AggregationEpsilon: 0.5,
    PartitionSelectionEpsilon: 0.5,
    PartitionSelectionDelta: 1e-10,
  })
	pcol := pbeam.MakePrivate(s, icol, spec) // pcol is a PrivatePCollection<data>
	// Arbitrary transformations can be applied to the data…
	pcol = pbeam.ParDo(s, pcol, someDoFn)
	pcol = pbeam.ParDo(s, pcol, otherDoFn)
	// …and to retrieve PCollection outputs, differentially private aggregations must be used.
	// For example, assuming pcol is now a PrivatePCollection<field,float64>:
	sumParams := pbeam.SumParams{MaxPartitionsContributed: 10, MaxValue: 5}
	ocol := pbeam.SumPerKey(s, pcol2, sumParams) // ocol is a PCollection<field,float64>
	// And it is now possible to output this data.
	textio.Write(s, "/path/to/output/file", ocol)

The behavior of PrivatePCollection is similar to the behavior of PCollection. In particular, it implements arbitrary per-record transformations via ParDo. However, the contents of a PrivatePCollection cannot be written to disk. For example, there is no equivalent of:

textio.Write(s, "/path/to/output/file", pcol)

In order to retrieve data encapsulated in a PrivatePCollection, it is necessary to use one of the differentially private aggregations provided with this library (e.g., count, sum, mean), which transforms the PrivatePCollection back into a PCollection.

This is because of the API contract provided by this library: once data is encapsulated in a PrivatePCollection, all its outputs are differentially private. More precisely, suppose a PrivatePCollection pcol is created from a PCollection<K,V> icol with privacy parameters (ε,δ), and output in one or several PCollections (ocol1, ocol2, ocol3). Let f be the corresponding randomized transformation, associating icol with (ocol1, ocol2, ocol3). Then f is (ε,δ)-differentially private in the following sense. Let icol' be the PCollection obtained by removing all records associated with a given value of K in icol. Then, for any set S of possible outputs:

P[f(icol) ∈ S] ≤ exp(ε) * P[f(icol') ∈ S] + δ.

The K, in the example above, is userID, representing a user identifier. This means that the full list of contributions of any given user is protected. However, this does not need to be the case; the protected property might be different than a user identifier. In this library, we use the more general terminology of "privacy unit" to refer to the type of this identifier (for example, user ID, event ID, a pair (user ID, day)); and "privacy identifier" to refer to a particular instance of this identifier (for example, user n°4217, event n°99, or the pair (user n°4127,2020-06-24)).

Note that the interface contract of PrivatePCollection has limitations. this library assumes that the user of the library is trusted with access to the underlying raw data. This intended user is a well-meaning developer trying to produce anonymized metrics about data using differential privacy. The API tries to make it easy to anonymize metrics that are safe to publish to untrusted parties; and difficult to break the differential privacy privacy guarantees by mistake.

However, this API does not attempt to protect against malicious library users. In particular, nothing prevents a user of this library from adding a side-effect to a ParDo function to leak raw data and bypass differential privacy guarantees. Similarly, ParDo functions are allowed to return errors that crash the pipeline, which could be abused to leak raw data. There is no protection against timing or side-channel attacks, as we assume that the only thing malicious users have access to is the output data.

Example

This example computes the "Sum-up revenue per day of the week" example from the Go Differential Privacy Library documentation, available at https://github.com/google/differential-privacy/go/README.md.

It assumes that the input file, "week_data.csv", has the same format as the data used in the above example: https://github.com/google/differential-privacy/go/examples/data/week_data.csv

package main

import (
	"context"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
	"github.com/google/differential-privacy/privacy-on-beam/v3/pbeam"
)

func main() {
	// visit contains the data corresponding to a single restaurant visit.
	type visit struct {
		visitorID  string
		eurosSpent int
		weekday    int
	}

	// Initialize the pipeline.
	beam.Init()
	p := beam.NewPipeline()
	s := p.Root()

	// Load the data and parse each visit, ignoring parsing errors.
	icol := textio.Read(s, "week_data.csv")
	icol = beam.ParDo(s, func(s string, emit func(visit)) {
		var visitorID string
		var euros, weekday int
		_, err := fmt.Sscanf(s, "%s, %d, %d", &visitorID, &euros, &weekday)
		if err != nil {
			return
		}
		emit(visit{visitorID, euros, weekday})
	}, icol)

	// Transform the input PCollection into a PrivatePCollection.

	// ε and δ are the differential privacy parameters that quantify the privacy
	// provided by the pipeline.
	const ε, δ = 1, 1e-3

	privacySpec, err := pbeam.NewPrivacySpec(pbeam.PrivacySpecParams{
		AggregationEpsilon:        ε / 2,
		PartitionSelectionEpsilon: ε / 2,
		AggregationDelta:          δ,
	})
	if err != nil {
		// Handle error.
	}
	pcol := pbeam.MakePrivateFromStruct(s, icol, privacySpec, "visitorID")
	// pcol is now a PrivatePCollection<visit>.

	// Compute the differentially private sum-up revenue per weekday. To do so,
	// we extract a KV pair, where the key is weekday and the value is the
	// money spent.
	pWeekdayEuros := pbeam.ParDo(s, func(v visit) (int, int) {
		return v.weekday, v.eurosSpent
	}, pcol)
	sumParams := pbeam.SumParams{
		// There is only a single differentially private aggregation in this
		// pipeline, so the entire privacy budget will be consumed (ε=1 and
		// δ=10⁻³). If multiple aggregations are present, we would need to
		// manually specify the privacy budget used by each.

		// If a visitor of the restaurant is present in more than 4 weekdays,
		// some of these contributions will be randomly dropped.
		// Larger values lets you keep more contributions (more of the raw data)
		// but lead to more noise in the output because the noise will be scaled
		// by the value. See the relevant section in the codelab for details:
		// https://codelabs.developers.google.com/codelabs/privacy-on-beam/#8
		MaxPartitionsContributed: 4,

		// If a visitor of the restaurant spends more than 50 euros, or less
		// than 0 euros, their contribution will be clamped.
		// Similar to MaxPartitionsContributed, a larger interval lets you keep more
		// of the raw data but lead to more noise in the output because the noise
		// will be scaled by max(|MinValue|,|MaxValue|).
		MinValue: 0,
		MaxValue: 50,
	}
	ocol := pbeam.SumPerKey(s, pWeekdayEuros, sumParams)

	// ocol is a regular PCollection; it can be written to disk.
	formatted := beam.ParDo(s, func(weekday int, sum int64) string {
		return fmt.Sprintf("Weekday n°%d: total spend is %d euros", weekday, sum)
	}, ocol)
	textio.Write(s, "spend_per_weekday.txt", formatted)

	// Execute the pipeline.
	if _, err := direct.Execute(context.Background(), p); err != nil {
		fmt.Printf("Pipeline failed: %v", err)
	}
}
Output:

Example (TestPipelines)

This example demonstrates how to write test pipelines for pbeam using test modes where pbeam does not add any noise, disables partition selection and might disable or enable contribution bounding depending on the particular test mode used.

This mirrors the default example with two differences: 1. TestMode is specified when creating the PrivacySpec. 2. Code comments are different.

Using Privacy on Beam with test mode enabled does not provide any privacy guarantees and is only meant to be used in test code or for performing an analysis of the utility of differential privacy by comparing "true" results with "private" results. DO NOT use this for production pipelines.

package main

import (
	"context"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
	"github.com/google/differential-privacy/privacy-on-beam/v3/pbeam"
)

func main() {
	// This example computes the "Sum-up revenue per day of the week" example
	// from the Go Differential Privacy Library documentation, available at
	// https://github.com/google/differential-privacy/go/README.md.
	//
	// It assumes that the input file, "week_data.csv", has the same format as
	// the data used in the above example:
	// https://github.com/google/differential-privacy/go/examples/data/week_data.csv

	// visit contains the data corresponding to a single restaurant visit.
	type visit struct {
		visitorID  string
		eurosSpent int
		weekday    int
	}

	// Initialize the pipeline.
	beam.Init()
	p := beam.NewPipeline()
	s := p.Root()

	// Load the data and parse each visit, ignoring parsing errors.
	icol := textio.Read(s, "week_data.csv")
	icol = beam.ParDo(s, func(s string, emit func(visit)) {
		var visitorID string
		var euros, weekday int
		_, err := fmt.Sscanf(s, "%s, %d, %d", &visitorID, &euros, &weekday)
		if err != nil {
			return
		}
		emit(visit{visitorID, euros, weekday})
	}, icol)

	// Transform the input PCollection into a PrivatePCollection.

	// ε and δ are the differential privacy parameters that quantify the privacy
	// provided by the pipeline. Even though noise will not be added since we are using
	// test mode, ε and δ will still be used for validation of parameters; so use the
	// same parameters you use for production.
	const ε, δ = 1, 1e-3

	// We enable test mode by setting TestMode field to pbeam.TestModeWithContributionBounding.
	// This is the only difference with a production pipeline with privacy
	// that uses pbeam.NewPrivacySpec(), everything else remains the same.
	// This enables per-partition and cross-partition contribution bounding. If you
	// wish to disable both types of contribution bounding altogether, use
	// pbeam.TestModeWithoutContributionBounding instead.
	privacySpec, err := pbeam.NewPrivacySpec(pbeam.PrivacySpecParams{
		AggregationEpsilon:        ε / 2,
		AggregationDelta:          δ / 2,
		PartitionSelectionEpsilon: ε / 2,
		PartitionSelectionDelta:   δ / 2,
		TestMode:                  pbeam.TestModeWithContributionBounding,
	})
	if err != nil {
		fmt.Printf("Couldn't create PrivacySpec: %v", err)
	}
	pcol := pbeam.MakePrivateFromStruct(s, icol, privacySpec, "visitorID")
	// pcol is now a PrivatePCollection<visit>.

	// Compute a non-private sum-up revenue per weekday. To do so, we extract a
	// KV pair, where the key is weekday and the value is the money spent.
	pWeekdayEuros := pbeam.ParDo(s, func(v visit) (int, int) {
		return v.weekday, v.eurosSpent
	}, pcol)
	sumParams := pbeam.SumParams{
		// There is only a single differentially private aggregation in this
		// pipeline, so the entire privacy budget will be consumed (ε=1 and
		// δ=10⁻³). If multiple aggregations are present, we would need to
		// manually specify the privacy budget used by each.

		// If a visitor of the restaurant is present in more than 4 weekdays,
		// some of these contributions will be randomly dropped.
		// Larger values lets you keep more contributions (more of the raw data)
		// but lead to more noise in the output because the noise will be scaled
		// by the value. See the relevant section in the codelab for details:
		// https://codelabs.developers.google.com/codelabs/privacy-on-beam/#8
		MaxPartitionsContributed: 4,

		// If a visitor of the restaurant spends more than 50 euros, or less
		// than 0 euros, their contribution will be clamped.
		// Similar to MaxPartitionsContributed, a larger interval lets you keep more
		// of the raw data but lead to more noise in the output because the noise
		// will be scaled by max(|MinValue|,|MaxValue|).
		MinValue: 0,
		MaxValue: 50,
	}
	// Since test mode is used, this will produce a non-differentially private
	// sum of revenue per day.
	ocol := pbeam.SumPerKey(s, pWeekdayEuros, sumParams)

	// ocol is a regular PCollection; it can be written to disk.
	formatted := beam.ParDo(s, func(weekday int, sum int64) string {
		return fmt.Sprintf("Weekday n°%d: total spend is %d euros", weekday, sum)
	}, ocol)
	textio.Write(s, "spend_per_weekday.txt", formatted)

	// Execute the pipeline.
	if _, err := direct.Execute(context.Background(), p); err != nil {
		fmt.Printf("Pipeline failed: %v", err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Count

Count counts the number of times a value appears in a PrivatePCollection, adding differentially private noise to the counts and doing pre-aggregation thresholding to remove counts with a low number of distinct privacy identifiers.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

Note: Do not use when your results may cause overflows for int64 values. This aggregation is not hardened for such applications yet.

Count transforms a PrivatePCollection<V> into a PCollection<V, int64>.

func DistinctPerKey

func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKeyParams) beam.PCollection

DistinctPerKey estimates the number of distinct values associated to each key in a PrivatePCollection, adding differentially private noise to the estimates and doing pre-aggregation thresholding to remove estimates with a low number of distinct privacy identifiers.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

DistinctPerKey transforms a PrivatePCollection<K,V> into a PCollection<K,int64>.

func DistinctPrivacyID

func DistinctPrivacyID(s beam.Scope, pcol PrivatePCollection, params DistinctPrivacyIDParams) beam.PCollection

DistinctPrivacyID counts the number of distinct privacy identifiers associated with each value in a PrivatePCollection, adding differentially private noise to the counts and doing post-aggregation thresholding to remove low counts. It is conceptually equivalent to calling Count with MaxValue=1, but is specifically optimized for this use case.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

DistinctPrivacyID transforms a PrivatePCollection<V> into a PCollection<V,int64>.

Note: Do not use when your results may cause overflows for int64 values. This aggregation is not hardened for such applications yet.

func MeanPerKey

func MeanPerKey(s beam.Scope, pcol PrivatePCollection, params MeanParams) beam.PCollection

MeanPerKey obtains the mean of the values associated with each key in a PrivatePCollection<K,V>, adding differentially private noise to the means and doing pre-aggregation thresholding to remove means with a low number of distinct privacy identifiers.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

MeanPerKey transforms a PrivatePCollection<K,V> into a PCollection<K,float64>.

Note: Do not use when your results may cause overflows for float64 values. This aggregation is not hardened for such applications yet.

func QuantilesPerKey

func QuantilesPerKey(s beam.Scope, pcol PrivatePCollection, params QuantilesParams) beam.PCollection

QuantilesPerKey computes one or multiple quantiles of the values associated with each key in a PrivatePCollection<K,V>, adding differentially private noise to the quantiles and doing pre-aggregation thresholding to remove partitions with a low number of distinct privacy identifiers.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

QuantilesPerKey transforms a PrivatePCollection<K,V> into a PCollection<K,[]float64>.

Note that due to the implementation details of the internal Quantiles algorithm, using pbeamtest with QuantilesPerKey has two caveats:

  1. Even without DP noise, the output will be slightly noisy. You can use pbeamtest.QuantilesTolerance() to account for that noise.
  2. It is not possible to not clamp input values when using pbeamtest.NewPrivacySpecNoNoiseWithoutContributionBounding(), so clamping to Min/MaxValue will still be applied. However, MaxContributionsPerPartition and MaxPartitionsContributed contribution bounding will be disabled.

func SelectPartitions

func SelectPartitions(s beam.Scope, pcol PrivatePCollection, params SelectPartitionsParams) beam.PCollection

SelectPartitions performs differentially private partition selection using dpagg.PreAggSelectPartitions and returns the list of partitions to keep as a PCollection.

In a PrivatePCollection<K,V>, K is the partition key and in a PrivatePCollection<V>, V is the partition key. SelectPartitions transforms a PrivatePCollection<K,V> into a PCollection<K> and a PrivatePCollection<V> into a PCollection<V>.

func SumPerKey

func SumPerKey(s beam.Scope, pcol PrivatePCollection, params SumParams) beam.PCollection

SumPerKey sums the values associated with each key in a PrivatePCollection<K,V>, adding differentially private noise to the sums and doing pre-aggregation thresholding to remove sums with a low number of distinct privacy identifiers. Client can also specify a PCollection of partitions.

It is also possible to manually specify the list of partitions present in the output, in which case the partition selection/thresholding step is skipped.

SumPerKey transforms a PrivatePCollection<K,V> either into a PCollection<K,int64> or a PCollection<K,float64>, depending on whether its input is an integer type or a float type.

Note: Do not use when your results may cause overflows for int64 and float64 values. This aggregation is not hardened for such applications yet.

Types

type CountParams

type CountParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionParams PartitionSelectionParams
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionParams are specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct values that a given privacy identifier
	// can influence. If a privacy identifier is associated with more values,
	// random values will be dropped. There is an inherent trade-off when
	// choosing this parameter: a larger MaxPartitionsContributed leads to less
	// data loss due to contribution bounding, but since the noise added in
	// aggregations is scaled according to maxPartitionsContributed, it also
	// means that more noise is added to each count.
	//
	// Required.
	MaxPartitionsContributed int64
	// The maximum number of times that a privacy identifier can contribute to
	// a single count (or, equivalently, the maximum value that a privacy
	// identifier can add to a single count in total). If MaxValue=10 and a
	// privacy identifier is associated with the same value in 15 records, Count
	// ignores 5 of these records and only adds 10 to the count for this value.
	// There is an inherent trade-off when choosing MaxValue: a larger
	// parameter means that fewer records are lost, but a larger noise is added.
	//
	// Required.
	MaxValue int64
	// Allow negative counts in the output. Most users would expect a count
	// aggregation to return non-negative values. However, to get better
	// statistical properties, especially for subsequent post-processing
	// steps, users can choose to allow negative outputs via this option.
	//
	// The default is to clamp the anonymized values and only return
	// non-negative counts.
	//
	// Optional.
	AllowNegativeOutputs bool
}

CountParams specifies the parameters associated with a Count aggregation.

type DistinctPerKeyParams

type DistinctPerKeyParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionParams PartitionSelectionParams
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionParams are specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct keys that a given privacy identifier
	// can influence. If a privacy identifier is associated to more keys,
	// random keys will be dropped. There is an inherent trade-off when
	// choosing this parameter: a larger MaxPartitionsContributed leads to less
	// data loss due to contribution bounding, but since the noise added in
	// aggregations is scaled according to maxPartitionsContributed, it also
	// means that more noise is added to each count.
	//
	// Required.
	MaxPartitionsContributed int64
	// The maximum number of distinct values a given privacy identifier can
	// contribute to for each key. There is an inherent trade-off when choosing this
	// parameter: a larger MaxContributionsPerPartition leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxContributionsPerPartition, it also means that more
	// noise is added to each mean.
	//
	// Required.
	MaxContributionsPerPartition int64
}

DistinctPerKeyParams specifies the parameters associated with a DistinctPerKeyParams aggregation.

type DistinctPrivacyIDParams

type DistinctPrivacyIDParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation. Note that DistinctPrivacyID doesn't consume epsilon for
	// partition selection, so you can only specify a delta.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionDelta float64
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionDelta is specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct values that a given privacy identifier
	// can influence. If a privacy identifier is associated with more values,
	// random values will be dropped. There is an inherent trade-off when
	// choosing this parameter: a larger MaxPartitionsContributed leads to less
	// data loss due to contribution bounding, but since the noise added in
	// aggregations is scaled according to maxPartitionsContributed, it also
	// means that more noise is added to each count.
	//
	// Required.
	MaxPartitionsContributed int64
}

DistinctPrivacyIDParams specifies the parameters associated with a DistinctPrivacyID aggregation.

type GaussianNoise

type GaussianNoise struct{}

GaussianNoise is an aggregations param that makes them use Gaussian Noise.

type LaplaceNoise

type LaplaceNoise struct{}

LaplaceNoise is an aggregations param that makes them use Laplace Noise.

type MeanParams

type MeanParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionParams PartitionSelectionParams
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionParams are specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct values that a given privacy identifier
	// can influence. There is an inherent trade-off when choosing this
	// parameter: a larger MaxPartitionsContributed leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxPartitionsContributed, it also means that more
	// noise is added to each mean.
	//
	// Required.
	MaxPartitionsContributed int64
	// The maximum number of contributions from a given privacy identifier
	// for each key. There is an inherent trade-off when choosing this
	// parameter: a larger MaxContributionsPerPartition leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxContributionsPerPartition, it also means that more
	// noise is added to each mean.
	//
	// Required.
	MaxContributionsPerPartition int64
	// A single contribution of a given privacy identifier to the numerator of
	// the mean of a partition can be at least MinValue, and at most MaxValue;
	// otherwise it will be clamped to these bounds. There is an inherent trade-off
	// when choosing MinValue and MaxValue: a small MinValue and a large MaxValue
	// means that less records will be clamped, but that more noise will be added.
	// For example, if a privacy identifier is associated with the key-value
	// pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the (MinValue, MaxValue)
	// bounds are (0, 5), the first contribution for "a" (-5) will be clamped up
	// to 0, the second contribution for "a" (2) will be untouched, the
	// contribution for "b" will be clamped down to 5, and the contribution for
	// "c" will be untouched.
	//
	// Required.
	MinValue, MaxValue float64
}

MeanParams specifies the parameters associated with a Mean aggregation.

type NoiseKind

type NoiseKind interface {
	// contains filtered or unexported methods
}

NoiseKind represents the kind of noise to be used in an aggregations.

type PartitionSelectionParams

type PartitionSelectionParams struct {
	// Differential privacy budget consumed by private partition selection.
	//
	// If this is the only private partition selection operation in the pipeline (e.g. the only
	// aggregation in the pipeline, the only aggregation in the pipeline where public partitions are
	// not specified, the only SelectPartitions aggregation aggregation in the pipeline where other
	// aggregations use public partitions), both Epsilon and Delta can be left 0; in that case, the
	// entire budget reserved for partition selection in the PrivacySpec is consumed.
	Epsilon, Delta float64
	// Warning: This parameter can currently only be set for SelectPartitions aggregation.
	//
	// The maximum number of distinct keys that a given privacy identifier can influence. If a privacy
	// identifier is associated to more keys, random keys will be dropped. There is an inherent
	// trade-off when choosing this parameter: a larger MaxPartitionsContributed leads to less data
	// loss due to contribution bounding, but since the noise added in aggregations is scaled
	// according to maxPartitionsContributed, it also means that probability of keeping a partition
	// with a given privacy ID count is lowered.
	//
	// Required.
	MaxPartitionsContributed int64
}

PartitionSelectionParams holds the ε & δ budget to be used for private partition selection of an aggregation. It is also used to specify parameters of a SelectPartitions aggregation.

type PrivacySpec

type PrivacySpec struct {
	// contains filtered or unexported fields
}

PrivacySpec contains information about the privacy parameters used in a PrivatePCollection. It encapsulates a privacy budget that must be shared between all aggregations on PrivatePCollections using this PrivacySpec. If you have multiple pipelines in the same binary, and want them to use different privacy budgets, call NewPrivacySpec multiple times and give a different PrivacySpec to each PrivatePCollection.

func NewPrivacySpec

func NewPrivacySpec(params PrivacySpecParams) (*PrivacySpec, error)

NewPrivacySpec creates a new PrivacySpec with the specified privacy budget and parameters.

Aggregation(Epsilon|Delta) and PartitionSelection(Epsilon|Delta) are the total (ε,δ)-differential privacy budget for the pipeline. If there is only one aggregation or partition selection, the entire budget will be used for this operation. Otherwise, the user must specify how the privacy budget is split across aggregations.

type PrivacySpecParams

type PrivacySpecParams struct {
	// Epsilon (ε) budget available for aggregations performed on this PrivatePCollection. Required unless
	// the only aggregation in the pipeline is pbeam.SelectPartitions.
	AggregationEpsilon float64
	// Delta (δ) budget available for aggregations performed on this PrivatePCollection. Only set it if you
	// use Gaussian Noise.
	AggregationDelta float64
	// Epsilon (ε) budget available for partition selections performed on this PrivatePCollection. Required unless
	// you use public partitions.
	PartitionSelectionEpsilon float64
	// Delta (δ) budget available for partition selections performed on this PrivatePCollection. Required unless
	// you use public partitions.
	PartitionSelectionDelta float64
	// PreThreshold contains an optional additional threshold. Pre-thresholding is
	// performed in combination with private partition selection to ensure that
	// each partition has at least a K number of unique contributions.
	//
	// See https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
	// for more information.
	//
	// Pre-thresholding is currently only available for partition selection.
	PreThreshold int64
	// Test mode for test pipelines, disabled by default. Set it to TestModeWithContributionBounding or
	// TestModeWithoutContributionBounding if you want to enable test mode.
	TestMode TestMode
}

PrivacySpecParams contains parameters to construct a PrivacySpec.

Uses the new privacy budget API where clients specify aggregation budget and partition selection budget separately.

type PrivatePCollection

type PrivatePCollection struct {
	// contains filtered or unexported fields
}

A PrivatePCollection embeds a PCollection, associating each element to a privacy identifier, and ensures that its content can only be written to a sink after being anonymized using differentially private aggregations.

We call "privacy identifier" the value of the identifier associated with a record (e.g. 62934947), and "privacy unit" the semantic type of this identifier (e.g. "user ID"). Typical choices for privacy units include user IDs or session IDs. This choice determines the privacy unit protected by differential privacy. For example, if the privacy unit is user ID, then the output of aggregations will be (ε,δ)-indistinguishable from the output obtained via PrivatePCollection in which all records associated with a single user ID have been removed, or modified.

Some operations on PCollections are also available on PrivatePCollection, for example a limited subset of ParDo operations. They transparently propagate privacy identifiers, preserving the privacy guarantees of the PrivatePCollection.

func DropKey

DropKey drops the key for an input PrivatePCollection<K,V>. It returns a PrivatePCollection<V>.

func DropValue

DropValue drops the value for an input PrivatePCollection<K,V>. It returns a PrivatePCollection<K>.

func MakePrivate

func MakePrivate(_ beam.Scope, col beam.PCollection, spec *PrivacySpec) PrivatePCollection

MakePrivate transforms a PCollection<K,V> into a PrivatePCollection<V>, where <K> is the privacy unit.

func MakePrivateFromProto

func MakePrivateFromProto(s beam.Scope, col beam.PCollection, spec *PrivacySpec, idFieldPath string) PrivatePCollection

MakePrivateFromProto creates a PrivatePCollection from a PCollection of proto messages and the qualified name of the field to use as a privacy key. The field and all its parents must be non-repeated, and the field itself cannot be a submessage.

func MakePrivateFromStruct

func MakePrivateFromStruct(s beam.Scope, col beam.PCollection, spec *PrivacySpec, idFieldPath string) PrivatePCollection

MakePrivateFromStruct creates a PrivatePCollection from a PCollection of structs and the qualified path (seperated by ".") of the struct field to use as a privacy key. For example:

  type exampleStruct1 struct {
    IntField int
		 StructField exampleStruct2
  }

  type  exampleStruct2 struct {
    StringField string
  }

If col is a PCollection of exampleStruct1, you could use "IntField" or "StructField.StringField" as idFieldPath.

Caution

The privacy key field must be a simple type (e.g. int, string, etc.), or a pointer to a simple type and all its parents must be structs or pointers to structs.

If the privacy key field is not set, all elements without a set field will be attributed to the same (default) privacy unit, likely degrading utility of future DP aggregations. Similarly, if the idFieldPath or any of its parents are nil, those elements will be attributed to the same (default) privacy unit as well.

func ParDo

func ParDo(s beam.Scope, doFn any, pcol PrivatePCollection) PrivatePCollection

ParDo applies the given function to all records, propagating privacy identifiers. For now, it only works if doFn is a function that has one of the following types.

Transforms a PrivatePCollection<X> into a PrivatePCollection<Y>:
	- func(X) Y
	- func(context.Context, X) Y
	- func(X) (Y, error)
	- func(context.Context, X) (Y, error)
	- func(X, emit), where emit has type func(Y)
	- func(context.Context, X, emit), where emit has type func(Y)
	- func(X, emit) error, where emit has type func(Y)
	- func(context.Context, X, emit) error, where emit has type func(Y)

Transforms a PrivatePCollection<X> into a PrivatePCollection<Y,Z>:
	- func(X) (Y, Z)
	- func(context.Context, X) (Y, Z)
	- func(X) (Y, Z, error)
	- func(context.Context, X) (Y, Z, error)
	- func(X, emit), where emit has type func(Y, Z)
	- func(context.Context, X, emit), where emit has type func(Y, Z)
	- func(X, emit) error, where emit has type func(Y, Z)
	- func(context.Context, X, emit) error, where emit has type func(Y, Z)

Transforms a PrivatePCollection<W,X> into a PrivatePCollection<Y>:
	- func(W, X) Y
	- func(context.Context, W, X) Y
	- func(W, X) (Y, error)
	- func(context.Context, W, X) (Y, error)
	- func(W, X, emit), where emit has type func(Y)
	- func(context.Context, W, X, emit), where emit has type func(Y)
	- func(W, X, emit) error, where emit has type func(Y)
	- func(context.Context, W, X, emit) error, where emit has type func(Y)

Transforms a PrivatePCollection<W,X> into a PrivatePCollection<Y,Z>:
	- func(W, X) (Y, Z)
	- func(context.Context, W, X) (Y, Z)
	- func(W, X) (Y, Z, error)
	- func(context.Context, W, X) (Y, Z, error)
	- func(W, X, emit), where emit has type func(Y, Z)
	- func(context.Context, W, X, emit), where emit has type func(Y, Z)
	- func(W, X, emit) error, where emit has type func(Y, Z)
	- func(context.Context error, W, X, emit), where emit has type func(Y, Z)

Note that Beam universal types (e.g., beam.V, beam.T, etc.) are not supported: each of the X, Y, Z, W above needs to be a concrete type.

type QuantilesParams

type QuantilesParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionParams PartitionSelectionParams
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionParams are specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct values that a given privacy identifier
	// can influence. There is an inherent trade-off when choosing this
	// parameter: a larger MaxPartitionsContributed leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxPartitionsContributed, it also means that more
	// noise is added to each quantile.
	//
	// Required.
	MaxPartitionsContributed int64
	// The maximum number of contributions from a given privacy identifier
	// for each key. There is an inherent trade-off when choosing this
	// parameter: a larger MaxContributionsPerPartition leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxContributionsPerPartition, it also means that more
	// noise is added to each quantile.
	//
	// Required.
	MaxContributionsPerPartition int64
	// A single contribution of a given privacy identifier to partition can be
	// at at least MinValue, and at most MaxValue; otherwise it will be clamped
	// to these bounds. There is an inherent trade-off when choosing MinValue and
	// MaxValue: a small MinValue and a large MaxValue means that less records
	// will be clamped, but that more noise will be added.
	// For example, if a privacy identifier is associated with the key-value
	// pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the (MinValue, MaxValue)
	// bounds are (0, 5), the contribution for "a" will be clamped up to 0,
	// the contribution for "b" will be clamped down to 5, and the contribution
	// for "c" will be untouched.
	//
	// Required.
	MinValue, MaxValue float64
	// Percentile ranks that the quantiles should be computed for. Each rank must
	// be between zero and one. The DP quantile operation returns a list of
	// quantile values corresponding to the respective ranks. E.g., a percentile
	// rank of 0.2 yields a quantile value that is greater than 20% and less than
	// 80% of the values in the data set.
	//
	// Note that computing multiple quantiles does not consume extra privacy budget,
	// i.e. computing multiple quantiles does not make each quantile less accurate
	// for a fixed privacy budget.
	Ranks []float64
}

QuantilesParams specifies the parameters associated with a Quantiles aggregation.

type SelectPartitionsParams

type SelectPartitionsParams = PartitionSelectionParams

SelectPartitionsParams specifies the parameters associated with a SelectPartitions aggregation.

TODO: Remove this alias.

type SumParams

type SumParams struct {
	// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
	//
	// Defaults to LaplaceNoise{}.
	NoiseKind NoiseKind
	// Differential privacy budget consumed by this aggregation. If there is
	// only one aggregation, both epsilon and delta can be left 0; in that case
	// the entire budget reserved for aggregation in the PrivacySpec is consumed.
	AggregationEpsilon, AggregationDelta float64
	// Differential privacy budget consumed by partition selection of this
	// aggregation.
	//
	// If PublicPartitions are specified, this needs to be left unset.
	//
	// If there is only one aggregation, this can be left unset; in that case
	// the entire budget reserved for partition selection in the PrivacySpec
	// is consumed.
	//
	// Optional.
	PartitionSelectionParams PartitionSelectionParams
	// You can input the list of partitions present in the output if you know
	// them in advance. When you specify partitions, partition selection /
	// thresholding will be disabled and partitions will appear in the output
	// if and only if they appear in the set of public partitions.
	//
	// You should not derive the list of partitions non-privately from private
	// data. You should only use this in either of the following cases:
	// 	1. The list of partitions is data-independent. For example, if you are
	// 	aggregating a metric by hour, you could provide a list of all possible
	// 	hourly period.
	// 	2. You use a differentially private operation to come up with the list of
	// 	partitions. For example, you could use the output of a SelectPartitions
	//  operation or the keys of a DistinctPrivacyID operation as the list of
	//  public partitions.
	//
	// PublicPartitions needs to be a beam.PCollection, slice, or array. The
	// underlying type needs to match the partition type of the PrivatePCollection.
	//
	// Prefer slices or arrays if the list of public partitions is small and
	// can fit into memory (e.g., up to a million). Prefer beam.PCollection
	// otherwise.
	//
	// If PartitionSelectionParams are specified, this needs to be left unset.
	//
	// Optional.
	PublicPartitions any
	// The maximum number of distinct values that a given privacy identifier
	// can influence. There is an inherent trade-off when choosing this
	// parameter: a larger MaxPartitionsContributed leads to less data loss due
	// to contribution bounding, but since the noise added in aggregations is
	// scaled according to maxPartitionsContributed, it also means that more
	// noise is added to each count.
	//
	// Required.
	MaxPartitionsContributed int64
	// The total contribution of a given privacy identifier to partition can be
	// at at least MinValue, and at most MaxValue; otherwise it will be clamped
	// to these bounds. For example, if a privacy identifier is associated with
	// the key-value pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the
	// (MinValue, MaxValue) bounds are (0, 5), the contribution for "a" will be
	// clamped up to 0, the contribution for "b" will be clamped down to 5, and
	// the contribution for "c" will be untouched. There is an inherent
	// trade-off when choosing MinValue and MaxValue: a small MinValue and a
	// large MaxValue means that less records will be clamped, but that more
	// noise will be added.
	//
	// Required.
	MinValue, MaxValue float64
}

SumParams specifies the parameters associated with a Sum aggregation.

type TestMode

type TestMode int

TestMode is an enum representing different test modes for test pipelines available in Privacy on Beam.

const (
	// TestModeDisabled indicates that test mode is disabled. Default.
	TestModeDisabled TestMode = iota
	// TestModeWithContributionBounding is the test mode where no noise is added, but contribution bounding is done.
	TestModeWithContributionBounding
	// TestModeWithoutContributionBounding is the test mode where no noise is added and no contribution bounding is done.
	TestModeWithoutContributionBounding
)

Directories

Path Synopsis
Package pbeamtest provides PrivacySpecs for testing Privacy on Beam pipelines without noise.
Package pbeamtest provides PrivacySpecs for testing Privacy on Beam pipelines without noise.
Package testutils provides helper functions, structs, etc.
Package testutils provides helper functions, structs, etc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL