Documentation ¶
Overview ¶
Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc
Index ¶
- Variables
- type Block
- type BlockKind
- type Config
- type ElementManager
- func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
- func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
- func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
- func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
- func (em *ElementManager) FailBundle(rb RunBundle)
- func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
- func (em *ElementManager) Impulse(stageID string)
- func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
- func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, ...)
- func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals [][]byte)
- func (em *ElementManager) StageAggregates(ID string)
- func (em *ElementManager) StageStateful(ID string)
- func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
- type LinkID
- type PColInfo
- type RunBundle
- type StateData
- type TentativeData
- func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
- func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
- func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
- func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
- func (d *TentativeData) WriteData(colID string, data []byte)
- func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
- type TestStreamBuilder
- type TestStreamElement
- type TimerKey
Constants ¶
This section is empty.
Variables ¶
var ( OneKeyPerBundle bool // OneKeyPerBundle sets if a bundle is restricted to a single key. OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element. )
TODO: Move to better place for configuration
Functions ¶
This section is empty.
Types ¶
type Block ¶ added in v2.54.0
Block represents a contiguous set of data or timers for the same destination.
type BlockKind ¶ added in v2.54.0
type BlockKind int32
BlockKind indicates how the block is to be handled.
const ( BlockData BlockKind // BlockData represents data for the bundle. BlockTimer // BlockTimer represents timers for the bundle. )
type Config ¶
type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int }
type ElementManager ¶
type ElementManager struct {
// contains filtered or unexported fields
}
ElementManager handles elements, watermarks, and related errata to determine if a stage is able to be executed. It is the core execution engine of Prism.
Essentially, it needs to track the current watermarks for each PCollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.
Key parts:
- The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
- An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.
This means that a PCollection's watermark is the minimum of all it's consuming transforms.
So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.
Watermarks are advanced based on consumed input, except if the stage produces residuals.
func NewElementManager ¶
func NewElementManager(config Config) *ElementManager
func (*ElementManager) AddStage ¶
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
AddStage adds a stage to this element manager, connecting it's PCollections and nodes to the watermark propagation graph.
func (*ElementManager) AddTestStream ¶ added in v2.55.0
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
AddTestStream provides a builder interface for the execution layer to build the test stream from the protos.
func (*ElementManager) Bundles ¶
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
Bundles is the core execution loop. It produces a sequences of bundles able to be executed. The returned channel is closed when the context is canceled, or there are no pending elements remaining.
func (*ElementManager) DataAndTimerInputForBundle ¶ added in v2.54.0
func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements. Elements are encoded with the PCollection's coders.
func (*ElementManager) FailBundle ¶ added in v2.51.0
func (em *ElementManager) FailBundle(rb RunBundle)
FailBundle clears the extant data allowing the execution to shut down.
func (*ElementManager) GetSideData ¶ added in v2.53.0
func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.
func (*ElementManager) Impulse ¶
func (em *ElementManager) Impulse(stageID string)
Impulse marks and initializes the given stage as an impulse which is a root transform that starts processing.
func (*ElementManager) InputForBundle ¶
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the PCollection's coders.
func (*ElementManager) PersistBundle ¶
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals [][]byte, estimatedOWM map[string]mtime.Time)
PersistBundle uses the tentative bundle output to update the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.
MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.
func (*ElementManager) ReturnResiduals ¶ added in v2.48.0
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals [][]byte)
ReturnResiduals is called after a successful split, so the remaining work can be re-assigned to a new bundle.
func (*ElementManager) StageAggregates ¶
func (em *ElementManager) StageAggregates(ID string)
StageAggregates marks the given stage as an aggregation, which means elements will only be processed based on windowing strategies.
func (*ElementManager) StageStateful ¶ added in v2.54.0
func (em *ElementManager) StageStateful(ID string)
StageStateful marks the given stage as stateful, which means elements are processed by key.
func (*ElementManager) StateForBundle ¶ added in v2.54.0
func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
type LinkID ¶ added in v2.53.0
type LinkID struct {
Transform, Local, Global string
}
LinkID represents a fully qualified input or output.
type StateData ¶ added in v2.54.0
StateData is a "union" between Bag state and MultiMap state to increase common code.
type TentativeData ¶
TentativeData is where data for in progress bundles is put until the bundle executes successfully.
func (*TentativeData) AppendBagState ¶ added in v2.54.0
func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
AppendBagState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) AppendMultimapState ¶ added in v2.54.0
func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
AppendMultimapState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearBagState ¶ added in v2.54.0
func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
ClearBagState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapKeysState ¶ added in v2.54.0
func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
ClearMultimapKeysState clears tentative data for all user map keys. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapState ¶ added in v2.54.0
func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
ClearMultimapState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetBagState ¶ added in v2.54.0
func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
GetBagState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapKeysState ¶ added in v2.54.0
func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
GetMultimapKeysState retrieves all available user map keys.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapState ¶ added in v2.54.0
func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
GetMultimapState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) WriteData ¶
func (d *TentativeData) WriteData(colID string, data []byte)
WriteData adds data to a given global collectionID.
func (*TentativeData) WriteTimers ¶ added in v2.54.0
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
WriteTimers adds timers to the associated transform handler.
type TestStreamBuilder ¶ added in v2.55.0
type TestStreamBuilder interface { AddElementEvent(tag string, elements []TestStreamElement) AddWatermarkEvent(tag string, newWatermark mtime.Time) AddProcessingTimeEvent(d time.Duration) }
TestStreamBuilder builds a synthetic sequence of events for the engine to execute. A pipeline may only have a single TestStream and may panic.
type TestStreamElement ¶ added in v2.55.0
TestStreamElement wraps the provided bytes and timestamp for ingestion and use.