statecache

package
v2.35.0-RC8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2021 License: Apache-2.0, BSD-3-Clause, MIT Imports: 5 Imported by: 0

Documentation

Overview

Package statecache implements the state caching feature described by the Beam Fn API

The Beam State API and the intended caching behavior are described here: https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheMetrics

type CacheMetrics struct {
	Hits, Misses, Evictions, InUseEvictions, ReStreamErrors int64
}

CacheMetrics stores metrics for the cache across a pipeline run.

type SideInputCache

type SideInputCache struct {
	// contains filtered or unexported fields
}

SideInputCache stores a cache of reusable inputs for the purposes of eliminating redundant calls to the runner during execution of ParDos using side inputs.

A SideInputCache should be initialized when the SDK harness is initialized, creating storage for side input caching. On each ProcessBundleRequest, the cache will process the list of tokens for cacheable side inputs and be queried when side inputs are requested in bundle execution. Once a new bundle request comes in the valid tokens will be updated and the cache will be re-used. In the event that the cache reaches capacity, a random, currently invalid cached object will be evicted.

func (*SideInputCache) CompleteBundle

func (c *SideInputCache) CompleteBundle(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)

CompleteBundle takes the cache tokens passed to set the valid tokens and decrements their usage count for the purposes of maintaining a valid count of whether or not a value is still in use. Should be called once ProcessBundle has completed.

func (*SideInputCache) Init

func (c *SideInputCache) Init(cap int) error

Init makes the cache map and the map of IDs to cache tokens for the SideInputCache. Should only be called once. Returns an error for non-positive capacities.

func (*SideInputCache) QueryCache

func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream

QueryCache takes a transform ID and side input ID and checking if a corresponding side input has been cached. A query having a bad token (e.g. one that doesn't make a known token or one that makes a known but currently invalid token) is treated the same as a cache miss.

func (*SideInputCache) SetCache

func (c *SideInputCache) SetCache(ctx context.Context, transformID, sideInputID string, win, key []byte, input exec.ReStream) exec.ReStream

SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token then we silently do not cache the input, as this is an indication that the runner is treating that input as uncacheable.

func (*SideInputCache) SetValidTokens

func (c *SideInputCache) SetValidTokens(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)

SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of transform and side input IDs to cache tokens in the process. Should be called at the start of every new ProcessBundleRequest. If the runner does not support caching, the passed cache token values should be empty and all get/set requests will silently be no-ops.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL