Documentation ¶
Overview ¶
Package harness implements the SDK side of the Beam FnAPI.
Index ¶
- Constants
- func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) errordeprecated
- func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error
- type DataChannel
- func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, ...) (<-chan exec.Elements, error)
- func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser
- func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser
- type DataChannelManager
- type Options
- type ScopedDataManager
- func (s *ScopedDataManager) Close() error
- func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id exec.StreamID, expectedTimerTransforms []string) (<-chan exec.Elements, error)
- func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id exec.StreamID, family string) (io.WriteCloser, error)
- func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) (io.WriteCloser, error)
- type ScopedStateReader
- func (s *ScopedStateReader) Close() error
- func (s *ScopedStateReader) GetSideInputCache() exec.SideCache
- func (s *ScopedStateReader) OpenBagUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenBagUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenBagUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, key []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenIterableSideInput(ctx context.Context, id exec.StreamID, sideInputID string, w []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultiMapSideInput(ctx context.Context, id exec.StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapKeysUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultimapUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- type StateChannel
- type StateChannelManager
Constants ¶
const DefaultRemoteLoggingHook = "default_remote_logging"
DefaultRemoteLoggingHook is the key used for the default remote logging hook. If a runner wants to use an alternative logging solution, it can be disabled with hooks.DisableHook(harness.DefaultRemoteLoggingHook).
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
Variables ¶
This section is empty.
Functions ¶
func Main
deprecated
Main is the main entrypoint for the Go harness. It runs at "runtime" -- not "pipeline-construction time" -- on each worker. It is a FnAPI client and ultimately responsible for correctly executing user code.
Deprecated: Prefer MainWithOptions instead.
func MainWithOptions ¶
func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error
MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not "pipeline-construction time" -- on each worker. It is a FnAPI client and ultimately responsible for correctly executing user code.
Options are optional configurations for interfacing with the runner or similar.
Types ¶
type DataChannel ¶
type DataChannel struct {
// contains filtered or unexported fields
}
DataChannel manages a single gRPC stream over the Data API. Data from multiple bundles can be multiplexed over this stream. Data is pushed over the channel, so data for a reader may arrive before the reader connects. Thread-safe.
func (*DataChannel) OpenElementChan ¶
func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan exec.Elements, error)
OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.
func (*DataChannel) OpenTimerWrite ¶
func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser
OpenTimerWrite returns io.WriteCloser for the given timerFamilyID, instruction and ptransform.
func (*DataChannel) OpenWrite ¶
func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser
OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
type DataChannelManager ¶
type DataChannelManager struct {
// contains filtered or unexported fields
}
DataChannelManager manages data channels over the Data API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*DataChannelManager) Open ¶
func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataChannel, error)
Open opens a R/W DataChannel over the given port.
type Options ¶
type Options struct { RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI. StatusEndpoint string // Endpoint for worker status reporting. }
Options for harness.Main that affect execution of the harness, such as runner capabilities.
type ScopedDataManager ¶
type ScopedDataManager struct {
// contains filtered or unexported fields
}
ScopedDataManager scopes the global gRPC data manager to a single instruction. The indirection makes it easier to control access.
func NewScopedDataManager ¶
func NewScopedDataManager(mgr *DataChannelManager, instID instructionID) *ScopedDataManager
NewScopedDataManager returns a ScopedDataManager for the given instruction.
func (*ScopedDataManager) Close ¶
func (s *ScopedDataManager) Close() error
Close prevents new IO for this instruction.
func (*ScopedDataManager) OpenElementChan ¶
func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id exec.StreamID, expectedTimerTransforms []string) (<-chan exec.Elements, error)
OpenElementChan returns a channel of exec.Elements on the given stream.
func (*ScopedDataManager) OpenTimerWrite ¶
func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id exec.StreamID, family string) (io.WriteCloser, error)
OpenTimerWrite opens an io.WriteCloser on the given stream to write timers
func (*ScopedDataManager) OpenWrite ¶
func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) (io.WriteCloser, error)
OpenWrite opens an io.WriteCloser on the given stream.
type ScopedStateReader ¶
type ScopedStateReader struct {
// contains filtered or unexported fields
}
ScopedStateReader scopes the global gRPC state manager to a single instruction for side input use. The indirection makes it easier to control access.
func NewScopedStateReader ¶
func NewScopedStateReader(mgr *StateChannelManager, instID instructionID) *ScopedStateReader
NewScopedStateReader returns a ScopedStateReader for the given instruction.
func NewScopedStateReaderWithCache ¶
func NewScopedStateReaderWithCache(mgr *StateChannelManager, instID instructionID, cache *statecache.SideInputCache) *ScopedStateReader
NewScopedStateReaderWithCache returns a ScopedState reader for the given instruction with a pointer to a SideInputCache.
func (*ScopedStateReader) Close ¶
func (s *ScopedStateReader) Close() error
Close closes all open readers.
func (*ScopedStateReader) GetSideInputCache ¶
func (s *ScopedStateReader) GetSideInputCache() exec.SideCache
GetSideInputCache returns a pointer to the SideInputCache being used by the SDK harness.
func (*ScopedStateReader) OpenBagUserStateAppender ¶
func (s *ScopedStateReader) OpenBagUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenBagUserStateAppender opens a byte stream for appending user bag state.
func (*ScopedStateReader) OpenBagUserStateClearer ¶
func (s *ScopedStateReader) OpenBagUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenBagUserStateClearer opens a byte stream for clearing user bag state.
func (*ScopedStateReader) OpenBagUserStateReader ¶
func (s *ScopedStateReader) OpenBagUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
OpenBagUserStateReader opens a byte stream for reading user bag state.
func (*ScopedStateReader) OpenIterable ¶
func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, key []byte) (io.ReadCloser, error)
OpenIterable opens a byte stream for reading unwindowed iterables from the runner.
func (*ScopedStateReader) OpenIterableSideInput ¶
func (s *ScopedStateReader) OpenIterableSideInput(ctx context.Context, id exec.StreamID, sideInputID string, w []byte) (io.ReadCloser, error)
OpenIterableSideInput opens a byte stream for reading iterable side input
func (*ScopedStateReader) OpenMultiMapSideInput ¶
func (s *ScopedStateReader) OpenMultiMapSideInput(ctx context.Context, id exec.StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)
OpenMultiMapSideInput opens a byte stream for reading multimap side input.
func (*ScopedStateReader) OpenMultimapKeysUserStateClearer ¶
func (s *ScopedStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.
func (*ScopedStateReader) OpenMultimapKeysUserStateReader ¶
func (s *ScopedStateReader) OpenMultimapKeysUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
OpenMultimapKeysUserStateReader opens a byte stream for reading the keys of user multimap state.
func (*ScopedStateReader) OpenMultimapUserStateAppender ¶
func (s *ScopedStateReader) OpenMultimapUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)
OpenMultimapUserStateAppender opens a byte stream for appending user multimap state.
func (*ScopedStateReader) OpenMultimapUserStateClearer ¶
func (s *ScopedStateReader) OpenMultimapUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)
OpenMultimapUserStateClearer opens a byte stream for clearing user multimap state by key.
func (*ScopedStateReader) OpenMultimapUserStateReader ¶
func (s *ScopedStateReader) OpenMultimapUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.ReadCloser, error)
OpenMultimapUserStateReader opens a byte stream for reading user multimap state.
type StateChannel ¶
type StateChannel struct { DoneCh <-chan struct{} // contains filtered or unexported fields }
StateChannel manages state transactions over a single gRPC connection. It does not need to track readers and writers as carefully as the DataChannel, because the state protocol is request-based.
func (*StateChannel) Send ¶
func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error)
Send sends a state request and returns the response.
type StateChannelManager ¶
type StateChannelManager struct {
// contains filtered or unexported fields
}
StateChannelManager manages data channels over the State API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*StateChannelManager) Open ¶
func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateChannel, error)
Open opens a R/W StateChannel over the given port.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package init contains the harness initialization code defined by the FnAPI.
|
Package init contains the harness initialization code defined by the FnAPI. |
Package statecache implements the state caching feature described by the Beam Fn API
|
Package statecache implements the state caching feature described by the Beam Fn API |