tap

package
v0.0.0-...-7d3b672 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Collector

type Collector[Stream any] struct {
	// contains filtered or unexported fields
}

Collector the collector that communicates with singer taps

func NewTapCollector

func NewTapCollector[Stream any](args *CollectorArgs[Stream]) (*Collector[Stream], errors.Error)

NewTapCollector constructor for Collector

func (*Collector[Stream]) Execute

func (c *Collector[Stream]) Execute() (err errors.Error)

Execute executes the collector

type CollectorArgs

type CollectorArgs[Stream any] struct {
	helper.RawDataSubTaskArgs
	// The function that creates and returns a tap client
	TapClient Tap[Stream]
	// Optional - This function is called for the selected streams at runtime. Use this if any runtime modification is needed.
	TapStreamModifier func(stream *Stream) bool
	// The config the tap needs at runtime in order to execute
	TapConfig any
	// The specific tap stream to invoke at runtime
	StreamName   string
	ConnectionId uint64
	Table        string
	Incremental  bool
}

CollectorArgs args to initialize a Collector

type Output

type Output[R any] interface {
	// AsTapState tries to convert the map object to a State. Returns false if it can't be done.
	AsTapState() (*State, bool)
	// AsTapRecord tries to convert the map object to a Record. Returns false if it can't be done.
	AsTapRecord() (*Record[R], bool)
}

Output raw data from a tap. One of these fields can ever be non-nil

func NewSingerTapOutput

func NewSingerTapOutput(src json.RawMessage) (Output[json.RawMessage], errors.Error)

NewSingerTapOutput construct for Output. The src is the raw data coming from the tap

type RawState

type RawState struct {
	archived.GenericModel[string]
	Type  string
	Value datatypes.JSON
}

RawState The raw-database version of State

func FromState

func FromState(t *State) *RawState

FromState converts State to RawState

func (*RawState) TableName

func (*RawState) TableName() string

TableName the table name

type Record

type Record[R any] struct {
	Type          string    `json:"type"`
	Stream        string    `json:"stream"`
	TimeExtracted time.Time `json:"time_extracted"`
	Record        R         `json:"record"`
}

Record the fields embedded in a singer-tap record. The specifics of the record are tap-implementation specific.

type Response

type Response struct {
	Out Output[json.RawMessage]
	Err errors.Error
}

type SingerOutput

type SingerOutput struct {
	// contains filtered or unexported fields
}

SingerOutput raw data from a tap. One of these fields can ever be non-nil

func (*SingerOutput) AsTapRecord

func (r *SingerOutput) AsTapRecord() (*Record[json.RawMessage], bool)

AsTapRecord tries to convert the map object to a Record. Returns false if it can't be done.

func (*SingerOutput) AsTapState

func (r *SingerOutput) AsTapState() (*State, bool)

AsTapState tries to convert the map object to a State. Returns false if it can't be done.

type SingerTap

type SingerTap struct {
	*SingerTapConfig
	// contains filtered or unexported fields
}

SingerTap the Singer implementation of Tap

func NewSingerTap

func NewSingerTap(cfg *SingerTapConfig) (*SingerTap, errors.Error)

NewSingerTap the constructor for SingerTap

func (*SingerTap) GetName

func (t *SingerTap) GetName() string

GetName implements Tap.GetName

func (*SingerTap) Run

func (t *SingerTap) Run(ctx context.Context) (<-chan *Response, errors.Error)

Run implements Tap.Run

func (*SingerTap) SetConfig

func (t *SingerTap) SetConfig(cfg any) errors.Error

SetConfig implements Tap.SetConfig

func (*SingerTap) SetProperties

func (t *SingerTap) SetProperties(streamName string, propsModifier func(props *SingerTapStream) bool) (uint64, errors.Error)

SetProperties implements Tap.SetProperties

func (*SingerTap) SetState

func (t *SingerTap) SetState(state any) errors.Error

SetState implements Tap.SetState

type SingerTapConfig

type SingerTapConfig struct {
	TapExecutable        string
	StreamPropertiesFile string
	IsLegacy             bool
}

SingerTapConfig the set of variables needed to initialize a SingerTap

type SingerTapMetadata

type SingerTapMetadata map[string]any

SingerTapMetadata the structure of this is determined by the catalog/properties JSON of a singer tap

type SingerTapProperties

type SingerTapProperties struct {
	Streams []*SingerTapStream `json:"streams"`
}

SingerTapProperties wraps SingerTapStreams

type SingerTapSchema

type SingerTapSchema map[string]any

SingerTapSchema the structure of this is determined by the catalog/properties JSON of a singer tap

type SingerTapStream

type SingerTapStream struct {
	Stream        string              `json:"stream"`
	TapStreamId   string              `json:"tap_stream_id"`
	Schema        SingerTapSchema     `json:"schema"`
	Metadata      []SingerTapMetadata `json:"metadata"`
	KeyProperties any                 `json:"key_properties"`
}

SingerTapStream the deserialized version of each stream entry in the catalog/properties JSON of a singer tap

type State

type State struct {
	Type  string         `json:"type"`
	Value map[string]any `json:"value"`
}

State the fields embedded in a singer-tap state. The specifics of the value are tap-implementation specific.

func ToState

func ToState(raw *RawState) *State

ToState converts RawState to State

type Tap

type Tap[Stream any] interface {
	// Run runs the tap and returns a stream of results. Expected to be called after all the other Setters. The Stdout of the response should
	// be of type Output[json.RawMessage]
	Run(ctx context.Context) (<-chan *Response, errors.Error)
	// GetName the name of this tap
	GetName() string
	// SetProperties Sets the properties of the tap and allows you to modify the properties at runtime.
	// Returns a unique hash representing the properties object.
	SetProperties(streamName string, propsModifier func(props *Stream) bool) (uint64, errors.Error)
	// SetState sets state on this tap
	SetState(state any) errors.Error
	// SetConfig sets the config of this tap
	SetConfig(config any) errors.Error
}

Tap the abstract interface for Taps. Consumer code should not use concrete implementations directly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL