Version: v2.34.0-RC1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2021 License: Apache-2.0, BSD-3-Clause, MIT Imports: 4 Imported by: 0



Package statecache implements the state caching feature described by the Beam Fn API

The Beam State API and the intended caching behavior are described here:



This section is empty.


This section is empty.


This section is empty.


type CacheMetrics

type CacheMetrics struct {
	Hits           int64
	Misses         int64
	Evictions      int64
	InUseEvictions int64

CacheMetrics stores metrics for the cache across a pipeline run.

type SideInputCache

type SideInputCache struct {
	// contains filtered or unexported fields

SideInputCache stores a cache of reusable inputs for the purposes of eliminating redundant calls to the runner during execution of ParDos using side inputs.

A SideInputCache should be initialized when the SDK harness is initialized, creating storage for side input caching. On each ProcessBundleRequest, the cache will process the list of tokens for cacheable side inputs and be queried when side inputs are requested in bundle execution. Once a new bundle request comes in the valid tokens will be updated and the cache will be re-used. In the event that the cache reaches capacity, a random, currently invalid cached object will be evicted.

func (*SideInputCache) CompleteBundle

func (c *SideInputCache) CompleteBundle(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)

CompleteBundle takes the cache tokens passed to set the valid tokens and decrements their usage count for the purposes of maintaining a valid count of whether or not a value is still in use. Should be called once ProcessBundle has completed.

func (*SideInputCache) Init

func (c *SideInputCache) Init(cap int) error

Init makes the cache map and the map of IDs to cache tokens for the SideInputCache. Should only be called once. Returns an error for non-positive capacities.

func (*SideInputCache) QueryCache

func (c *SideInputCache) QueryCache(transformID, sideInputID string) exec.ReStream

QueryCache takes a transform ID and side input ID and checking if a corresponding side input has been cached. A query having a bad token (e.g. one that doesn't make a known token or one that makes a known but currently invalid token) is treated the same as a cache miss.

func (*SideInputCache) SetCache

func (c *SideInputCache) SetCache(transformID, sideInputID string, input exec.ReStream)

SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token then we silently do not cache the input, as this is an indication that the runner is treating that input as uncacheable.

func (*SideInputCache) SetValidTokens

func (c *SideInputCache) SetValidTokens(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)

SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of transform and side input IDs to cache tokens in the process. Should be called at the start of every new ProcessBundleRequest. If the runner does not support caching, the passed cache token values should be empty and all get/set requests will silently be no-ops.

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL