harness

package
v2.0.0-...-3952df4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 33 Imported by: 0

Documentation

Overview

Package harness implements the SDK side of the Beam FnAPI.

Index

Constants

View Source
const DefaultRemoteLoggingHook = "default_remote_logging"

DefaultRemoteLoggingHook is the key used for the default remote logging hook. If a runner wants to use an alternative logging solution, it can be disabled with hooks.DisableHook(harness.DefaultRemoteLoggingHook).

View Source
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"

URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.

Variables

This section is empty.

Functions

func Main deprecated

func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error

Main is the main entrypoint for the Go harness. It runs at "runtime" -- not "pipeline-construction time" -- on each worker. It is a FnAPI client and ultimately responsible for correctly executing user code.

Deprecated: Prefer MainWithOptions instead.

func MainWithOptions

func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error

MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not "pipeline-construction time" -- on each worker. It is a FnAPI client and ultimately responsible for correctly executing user code.

Options are optional configurations for interfacing with the runner or similar.

Types

type DataChannel

type DataChannel struct {
	// contains filtered or unexported fields
}

DataChannel manages a single gRPC stream over the Data API. Data from multiple bundles can be multiplexed over this stream. Data is pushed over the channel, so data for a reader may arrive before the reader connects. Thread-safe.

func (*DataChannel) OpenElementChan

func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan exec.Elements, error)

OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.

func (*DataChannel) OpenTimerWrite

func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser

OpenTimerWrite returns io.WriteCloser for the given timerFamilyID, instruction and ptransform.

func (*DataChannel) OpenWrite

func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser

OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.

type DataChannelManager

type DataChannelManager struct {
	// contains filtered or unexported fields
}

DataChannelManager manages data channels over the Data API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.

func (*DataChannelManager) Open

func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataChannel, error)

Open opens a R/W DataChannel over the given port.

type Options

type Options struct {
	RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
	StatusEndpoint     string   // Endpoint for worker status reporting.
}

Options for harness.Main that affect execution of the harness, such as runner capabilities.

type ScopedDataManager

type ScopedDataManager struct {
	// contains filtered or unexported fields
}

ScopedDataManager scopes the global gRPC data manager to a single instruction. The indirection makes it easier to control access.

func NewScopedDataManager

func NewScopedDataManager(mgr *DataChannelManager, instID instructionID) *ScopedDataManager

NewScopedDataManager returns a ScopedDataManager for the given instruction.

func (*ScopedDataManager) Close

func (s *ScopedDataManager) Close() error

Close prevents new IO for this instruction.

func (*ScopedDataManager) OpenElementChan

func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id exec.StreamID, expectedTimerTransforms []string) (<-chan exec.Elements, error)

OpenElementChan returns a channel of exec.Elements on the given stream.

func (*ScopedDataManager) OpenTimerWrite

func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id exec.StreamID, family string) (io.WriteCloser, error)

OpenTimerWrite opens an io.WriteCloser on the given stream to write timers

func (*ScopedDataManager) OpenWrite

func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) (io.WriteCloser, error)

OpenWrite opens an io.WriteCloser on the given stream.

type ScopedStateReader

type ScopedStateReader struct {
	// contains filtered or unexported fields
}

ScopedStateReader scopes the global gRPC state manager to a single instruction for side input use. The indirection makes it easier to control access.

func NewScopedStateReader

func NewScopedStateReader(mgr *StateChannelManager, instID instructionID) *ScopedStateReader

NewScopedStateReader returns a ScopedStateReader for the given instruction.

func NewScopedStateReaderWithCache

func NewScopedStateReaderWithCache(mgr *StateChannelManager, instID instructionID, cache *statecache.SideInputCache) *ScopedStateReader

NewScopedStateReaderWithCache returns a ScopedState reader for the given instruction with a pointer to a SideInputCache.

func (*ScopedStateReader) Close

func (s *ScopedStateReader) Close() error

Close closes all open readers.

func (*ScopedStateReader) GetSideInputCache

func (s *ScopedStateReader) GetSideInputCache() exec.SideCache

GetSideInputCache returns a pointer to the SideInputCache being used by the SDK harness.

func (*ScopedStateReader) OpenBagUserStateAppender

func (s *ScopedStateReader) OpenBagUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)

OpenBagUserStateAppender opens a byte stream for appending user bag state.

func (*ScopedStateReader) OpenBagUserStateClearer

func (s *ScopedStateReader) OpenBagUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)

OpenBagUserStateClearer opens a byte stream for clearing user bag state.

func (*ScopedStateReader) OpenBagUserStateReader

func (s *ScopedStateReader) OpenBagUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)

OpenBagUserStateReader opens a byte stream for reading user bag state.

func (*ScopedStateReader) OpenIterable

func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, key []byte) (io.ReadCloser, error)

OpenIterable opens a byte stream for reading unwindowed iterables from the runner.

func (*ScopedStateReader) OpenIterableSideInput

func (s *ScopedStateReader) OpenIterableSideInput(ctx context.Context, id exec.StreamID, sideInputID string, w []byte) (io.ReadCloser, error)

OpenIterableSideInput opens a byte stream for reading iterable side input

func (*ScopedStateReader) OpenMultiMapSideInput

func (s *ScopedStateReader) OpenMultiMapSideInput(ctx context.Context, id exec.StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)

OpenMultiMapSideInput opens a byte stream for reading multimap side input.

func (*ScopedStateReader) OpenMultimapKeysUserStateClearer

func (s *ScopedStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)

OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.

func (*ScopedStateReader) OpenMultimapKeysUserStateReader

func (s *ScopedStateReader) OpenMultimapKeysUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)

OpenMultimapKeysUserStateReader opens a byte stream for reading the keys of user multimap state.

func (*ScopedStateReader) OpenMultimapUserStateAppender

func (s *ScopedStateReader) OpenMultimapUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)

OpenMultimapUserStateAppender opens a byte stream for appending user multimap state.

func (*ScopedStateReader) OpenMultimapUserStateClearer

func (s *ScopedStateReader) OpenMultimapUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)

OpenMultimapUserStateClearer opens a byte stream for clearing user multimap state by key.

func (*ScopedStateReader) OpenMultimapUserStateReader

func (s *ScopedStateReader) OpenMultimapUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.ReadCloser, error)

OpenMultimapUserStateReader opens a byte stream for reading user multimap state.

type StateChannel

type StateChannel struct {
	DoneCh <-chan struct{}
	// contains filtered or unexported fields
}

StateChannel manages state transactions over a single gRPC connection. It does not need to track readers and writers as carefully as the DataChannel, because the state protocol is request-based.

func (*StateChannel) Send

Send sends a state request and returns the response.

type StateChannelManager

type StateChannelManager struct {
	// contains filtered or unexported fields
}

StateChannelManager manages data channels over the State API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.

func (*StateChannelManager) Open

Open opens a R/W StateChannel over the given port.

Directories

Path Synopsis
Package init contains the harness initialization code defined by the FnAPI.
Package init contains the harness initialization code defined by the FnAPI.
Package statecache implements the state caching feature described by the Beam Fn API
Package statecache implements the state caching feature described by the Beam Fn API

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL