engine

package
v2.55.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 20 Imported by: 0

Documentation

Overview

Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc

Index

Constants

This section is empty.

Variables

View Source
var (
	OneKeyPerBundle  bool // OneKeyPerBundle sets if a bundle is restricted to a single key.
	OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element.
)

TODO: Move to better place for configuration

Functions

This section is empty.

Types

type Block added in v2.54.0

type Block struct {
	Kind              BlockKind
	Bytes             [][]byte
	Transform, Family string
}

Block represents a contiguous set of data or timers for the same destination.

type BlockKind added in v2.54.0

type BlockKind int32

BlockKind indicates how the block is to be handled.

const (
	BlockData  BlockKind // BlockData represents data for the bundle.
	BlockTimer           // BlockTimer represents timers for the bundle.
)

type Config

type Config struct {
	// MaxBundleSize caps the number of elements permitted in a bundle.
	// 0 or less means this is ignored.
	MaxBundleSize int
}

type ElementManager

type ElementManager struct {
	// contains filtered or unexported fields
}

ElementManager handles elements, watermarks, and related errata to determine if a stage is able to be executed. It is the core execution engine of Prism.

Essentially, it needs to track the current watermarks for each PCollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.

Key parts:

  • The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
  • An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.

This means that a PCollection's watermark is the minimum of all it's consuming transforms.

So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.

Watermarks are advanced based on consumed input, except if the stage produces residuals.

func NewElementManager

func NewElementManager(config Config) *ElementManager

func (*ElementManager) AddStage

func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)

AddStage adds a stage to this element manager, connecting it's PCollections and nodes to the watermark propagation graph.

func (*ElementManager) AddTestStream added in v2.55.0

func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder

AddTestStream provides a builder interface for the execution layer to build the test stream from the protos.

func (*ElementManager) Bundles

func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle

Bundles is the core execution loop. It produces a sequences of bundles able to be executed. The returned channel is closed when the context is canceled, or there are no pending elements remaining.

func (*ElementManager) DataAndTimerInputForBundle added in v2.54.0

func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)

DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements. Elements are encoded with the PCollection's coders.

func (*ElementManager) FailBundle added in v2.51.0

func (em *ElementManager) FailBundle(rb RunBundle)

FailBundle clears the extant data allowing the execution to shut down.

func (*ElementManager) GetSideData added in v2.53.0

func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte

GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.

func (*ElementManager) Impulse

func (em *ElementManager) Impulse(stageID string)

Impulse marks and initializes the given stage as an impulse which is a root transform that starts processing.

func (*ElementManager) InputForBundle

func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte

InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the PCollection's coders.

func (*ElementManager) PersistBundle

func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals [][]byte, estimatedOWM map[string]mtime.Time)

PersistBundle uses the tentative bundle output to update the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.

MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))

PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.

func (*ElementManager) ReturnResiduals added in v2.48.0

func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals [][]byte)

ReturnResiduals is called after a successful split, so the remaining work can be re-assigned to a new bundle.

func (*ElementManager) StageAggregates

func (em *ElementManager) StageAggregates(ID string)

StageAggregates marks the given stage as an aggregation, which means elements will only be processed based on windowing strategies.

func (*ElementManager) StageStateful added in v2.54.0

func (em *ElementManager) StageStateful(ID string)

StageStateful marks the given stage as stateful, which means elements are processed by key.

func (*ElementManager) StateForBundle added in v2.54.0

func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData

StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.

TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.

type LinkID added in v2.53.0

type LinkID struct {
	Transform, Local, Global string
}

LinkID represents a fully qualified input or output.

type PColInfo

type PColInfo struct {
	GlobalID string
	WDec     exec.WindowDecoder
	WEnc     exec.WindowEncoder
	EDec     func(io.Reader) []byte
	KeyDec   func(io.Reader) []byte
}

type RunBundle

type RunBundle struct {
	StageID   string
	BundleID  string
	Watermark mtime.Time
}

func (RunBundle) LogValue

func (rb RunBundle) LogValue() slog.Value

type StateData added in v2.54.0

type StateData struct {
	Bag      [][]byte
	Multimap map[string][][]byte
}

StateData is a "union" between Bag state and MultiMap state to increase common code.

type TentativeData

type TentativeData struct {
	Raw map[string][][]byte
	// contains filtered or unexported fields
}

TentativeData is where data for in progress bundles is put until the bundle executes successfully.

func (*TentativeData) AppendBagState added in v2.54.0

func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)

AppendBagState appends the incoming data to the existing tentative data bundle.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) AppendMultimapState added in v2.54.0

func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)

AppendMultimapState appends the incoming data to the existing tentative data bundle.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearBagState added in v2.54.0

func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)

ClearBagState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearMultimapKeysState added in v2.54.0

func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)

ClearMultimapKeysState clears tentative data for all user map keys. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) ClearMultimapState added in v2.54.0

func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)

ClearMultimapState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetBagState added in v2.54.0

func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte

GetBagState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetMultimapKeysState added in v2.54.0

func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte

GetMultimapKeysState retrieves all available user map keys.

The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) GetMultimapState added in v2.54.0

func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte

GetMultimapState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.

func (*TentativeData) WriteData

func (d *TentativeData) WriteData(colID string, data []byte)

WriteData adds data to a given global collectionID.

func (*TentativeData) WriteTimers added in v2.54.0

func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)

WriteTimers adds timers to the associated transform handler.

type TestStreamBuilder added in v2.55.0

type TestStreamBuilder interface {
	AddElementEvent(tag string, elements []TestStreamElement)
	AddWatermarkEvent(tag string, newWatermark mtime.Time)
	AddProcessingTimeEvent(d time.Duration)
}

TestStreamBuilder builds a synthetic sequence of events for the engine to execute. A pipeline may only have a single TestStream and may panic.

type TestStreamElement added in v2.55.0

type TestStreamElement struct {
	Encoded   []byte
	EventTime mtime.Time
}

TestStreamElement wraps the provided bytes and timestamp for ingestion and use.

type TimerKey added in v2.54.0

type TimerKey struct {
	Transform, Family string
}

TimerKey is for use as a key for timers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL