base

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 18 Imported by: 1

Documentation

Index

Constants

View Source
const (
	StateFileName      = "state.json"
	ConfigFileName     = "config.json"
	CatalogFileName    = "catalog.json"
	PropertiesFileName = "properties.json"
)
View Source
const (
	AmplitudeType       = "amplitude"
	FbMarketingType     = "facebook_marketing"
	FirebaseType        = "firebase"
	GoogleAnalyticsType = "google_analytics"
	GooglePlayType      = "google_play"
	GoogleAdsType       = "google_ads"
	RedisType           = "redis"

	SingerType          = "singer"
	AirbyteType         = "airbyte"
	SdkSourceType       = "sdk_source"
	NativeConnectorType = "native"

	GoogleOAuthAuthorizationType = "OAuth"

	DefaultDaysBackToLoad = 365
)
View Source
const ConfigSignatureSuffix = "_JITSU_config"
View Source
const SignatureLayout = "2006-01-02T15:04:05.000Z"

Variables

View Source
var (
	DriverConstructors        = make(map[string]func(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error))
	DriverTestConnectionFuncs = make(map[string]func(config *SourceConfig) error)
)

Functions

func FillPreconfiguredOauth

func FillPreconfiguredOauth(sourceType string, config interface{})

func ParseProperties

func ParseProperties(system, prefix string, properties map[string]*Property, resultFields schema.Fields)

ParseProperties recursively parses singer/airbyte catalog properties and enriches resultFields

func RegisterDriver

func RegisterDriver(driverType string,
	createDriverFunc func(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error))

RegisterDriver registers function to create new driver instance

func RegisterTestConnectionFunc

func RegisterTestConnectionFunc(driverType string, testConnectionFunc func(config *SourceConfig) error)

RegisterTestConnectionFunc registers function to test driver connection

func StreamIdentifier

func StreamIdentifier(namespace, name string) string

func WaitReadiness

func WaitReadiness(driver CLIDriver, taskLogger logging.TaskLogger) (bool, error)

WaitReadiness waits 90 sec until driver is ready or returns false and notReadyError

Types

type AbstractCLIDriver

type AbstractCLIDriver struct {
	// contains filtered or unexported fields
}

AbstractCLIDriver is an abstract implementation of CLI drivers such as Singer or Airbyte

func NewAbstractCLIDriver

func NewAbstractCLIDriver(sourceID, tap, configPath, catalogPath, propertiesPath, initialStatePath, prefix, pathToConfigs string,
	tableNameMappings map[string]string) *AbstractCLIDriver

NewAbstractCLIDriver returns configured AbstractCLIDriver

func (*AbstractCLIDriver) GetAllAvailableIntervals

func (acd *AbstractCLIDriver) GetAllAvailableIntervals() ([]*TimeInterval, error)

GetAllAvailableIntervals unsupported

func (*AbstractCLIDriver) GetCatalogPath

func (acd *AbstractCLIDriver) GetCatalogPath() string

GetCatalogPath returns catalog path

func (*AbstractCLIDriver) GetCollectionMetaKey

func (acd *AbstractCLIDriver) GetCollectionMetaKey() string

func (*AbstractCLIDriver) GetCollectionTable

func (acd *AbstractCLIDriver) GetCollectionTable() string

GetCollectionTable unsupported

func (*AbstractCLIDriver) GetConfigPath

func (acd *AbstractCLIDriver) GetConfigPath() string

GetConfigPath returns config path

func (*AbstractCLIDriver) GetObjectsFor

func (acd *AbstractCLIDriver) GetObjectsFor(interval *TimeInterval, objectsLoader ObjectsLoader) error

GetObjectsFor unsupported

func (*AbstractCLIDriver) GetPropertiesPath

func (acd *AbstractCLIDriver) GetPropertiesPath() string

GetPropertiesPath returns properties path

func (*AbstractCLIDriver) GetRefreshWindow

func (acd *AbstractCLIDriver) GetRefreshWindow() (time.Duration, error)

GetRefreshWindow unsupported

func (*AbstractCLIDriver) GetStateFilePath

func (acd *AbstractCLIDriver) GetStateFilePath(state string) (string, error)

GetStateFilePath returns input state as a filepath or returns initial state

func (*AbstractCLIDriver) GetStreamTableNameMapping

func (acd *AbstractCLIDriver) GetStreamTableNameMapping() map[string]string

GetStreamTableNameMapping returns stream - table names mapping

func (*AbstractCLIDriver) GetTableNamePrefix

func (acd *AbstractCLIDriver) GetTableNamePrefix() string

GetTableNamePrefix returns stream table name prefix or sourceID_

func (*AbstractCLIDriver) GetTap

func (acd *AbstractCLIDriver) GetTap() string

func (*AbstractCLIDriver) ID

func (acd *AbstractCLIDriver) ID() string

ID returns sourceID

func (*AbstractCLIDriver) Ready

func (acd *AbstractCLIDriver) Ready() (bool, error)

Ready returns CLI Driver ready flag. Should be overridden in every implementation

func (*AbstractCLIDriver) SetCatalogPath

func (acd *AbstractCLIDriver) SetCatalogPath(catalogPath string)

SetCatalogPath sets catalog path

func (*AbstractCLIDriver) SetPropertiesPath

func (acd *AbstractCLIDriver) SetPropertiesPath(propertiesPath string)

SetPropertiesPath sets properties path

func (*AbstractCLIDriver) SetStreamTableNameMappingIfNotExists

func (acd *AbstractCLIDriver) SetStreamTableNameMappingIfNotExists(streamTableNameMappings map[string]string)

SetStreamTableNameMappingIfNotExists sets stream table name mapping if not exists

func (*AbstractCLIDriver) Type

func (acd *AbstractCLIDriver) Type() string

Type returns CLI Driver type. Should be overridden in every implementation

type CLIDataConsumer

type CLIDataConsumer interface {
	Consume(representation *CLIOutputRepresentation) error
	CleanupAfterError(representation *CLIOutputRepresentation)
}

CLIDataConsumer is used for consuming CLI drivers output

type CLIDriver

type CLIDriver interface {
	Driver

	//IsClosed returns true if the driver is already closed
	IsClosed() bool
	//Load runs CLI command and consumes output
	Load(config string, state string, taskLogger logging.TaskLogger, dataConsumer CLIDataConsumer, taskCloser CLITaskCloser) error
	//Ready returns true if the driver is ready otherwise returns ErrNotReady
	Ready() (bool, error)
	//GetTap returns npm package for sdk_source, Singer tap or airbyte docker image (without prefix 'airbyte/': source-mixpanel)
	GetTap() string
	//GetTableNamePrefix returns stream table name prefix or sourceID_
	GetTableNamePrefix() string
	//GetStreamTableNameMapping returns stream - table name mappings from configuration
	GetStreamTableNameMapping() map[string]string
	//GetTap returns path to config file
	GetConfigPath() string
}

CLIDriver interface must be implemented by every CLI source type (Singer or Airbyte)

type CLIOutputRepresentation

type CLIOutputRepresentation struct {
	State interface{}
	// contains filtered or unexported fields
}

CLIOutputRepresentation is a singer/airbyte output representation

func NewCLIOutputRepresentation

func NewCLIOutputRepresentation() *CLIOutputRepresentation

func (*CLIOutputRepresentation) AddStream

func (c *CLIOutputRepresentation) AddStream(streamName string, stream *StreamRepresentation)

func (*CLIOutputRepresentation) CurrentStream

func (c *CLIOutputRepresentation) CurrentStream() *StreamRepresentation

func (*CLIOutputRepresentation) GetStream

func (c *CLIOutputRepresentation) GetStream(streamName string) (*StreamRepresentation, bool)

func (*CLIOutputRepresentation) GetStreams

func (c *CLIOutputRepresentation) GetStreams() []*StreamRepresentation

type CLIParser

type CLIParser interface {
	Parse(stdout io.ReadCloser) error
}

type CLITaskCloser

type CLITaskCloser interface {
	TaskID() string
	CloseWithError(msg string, systemErr bool)
	HandleCanceling() error
}

CLITaskCloser is used for closing tasks

type Collection

type Collection struct {
	DaysBackToLoad int    `json:"-" yaml:"-"` //without serialization
	SourceID       string `json:"-" yaml:"-"` //without serialization

	Name         string                 `mapstructure:"name" json:"name,omitempty" yaml:"name,omitempty"`
	Type         string                 `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
	TableName    string                 `mapstructure:"table_name" json:"table_name,omitempty" yaml:"table_name,omitempty"`
	StartDateStr string                 `mapstructure:"start_date" json:"start_date,omitempty" yaml:"start_date,omitempty"`
	Schedule     string                 `mapstructure:"schedule" json:"schedule,omitempty" yaml:"schedule,omitempty"`
	SyncMode     string                 `mapstructure:"mode" json:"mode,omitempty" yaml:"mode,omitempty"`
	Parameters   map[string]interface{} `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

Collection is a dto for report unit serialization

func (*Collection) GetTableName

func (c *Collection) GetTableName() string

GetTableName returns TableName if it's set otherwise SourceID_CollectionName

func (*Collection) Init

func (c *Collection) Init() error

func (*Collection) Validate

func (c *Collection) Validate() error

Validate returns err if collection invalid

type DatePartition

type DatePartition struct {
	Field       string
	Value       time.Time
	Granularity schema.Granularity
}

type DeleteCondition

type DeleteCondition struct {
	Field  string
	Value  interface{}
	Clause string
}

DeleteCondition is a representation of SQL delete condition

type DeleteConditions

type DeleteConditions struct {
	Conditions    []DeleteCondition
	Partition     DatePartition
	JoinCondition string
}

DeleteConditions is a dto for multiple DeleteCondition instances with Joiner

func DeleteByTimeChunkCondition

func DeleteByTimeChunkCondition(timeInterval *TimeInterval) *DeleteConditions

DeleteByTimeChunkCondition return delete condition that removes objects based on eventn_ctx_time_interval value or empty condition if timeIntervalValue is empty

func (*DeleteConditions) IsEmpty

func (dc *DeleteConditions) IsEmpty() bool

IsEmpty returns true if there is no conditions

type Driver

type Driver interface {
	io.Closer
	//GetAllAvailableIntervals return all the available time intervals for data loading. It means, that if you want
	//your driver to load for the last year by month chunks, you need to return 12 time intervals, each covering one
	//month. There is drivers/granularity.ALL for data sources that store data which may not be split by date.
	GetAllAvailableIntervals() ([]*TimeInterval, error)

	//GetRefreshWindow return times duration during which Jitsu will keep reloading stream data.
	//Necessary for Sources where data may change retroactively (analytics, ads)
	GetRefreshWindow() (time.Duration, error)

	/*GetObjectsFor returns slice of objects per time interval. Each slice element is one object from the data source.
	pos - current position (object number)
	total - number of objects available to load. -1 is there is no way to know exact number in advance
	percent - percent of total object processed [0..100]. estimated value when there is no way to know exact number in advance*/
	GetObjectsFor(interval *TimeInterval, objectsLoader ObjectsLoader) error

	// On full syncs whether to replace tables with or clear and append to them
	ReplaceTables() bool

	//Type returns string type of driver. Should be unique among drivers
	Type() string

	//GetCollectionTable returns table name
	GetCollectionTable() string

	//GetCollectionMetaKey returns key for storing signature in meta.Storage
	GetCollectionMetaKey() string

	//GetDriversInfo returns telemetry information about the driver
	GetDriversInfo() *DriversInfo

	Delete() error
}

Driver interface must be implemented by every source type

type DriversInfo

type DriversInfo struct {
	SourceType       string
	ConnectorOrigin  string
	ConnectorVersion string
	Streams          int
}

DriversInfo is a dto for sharing information about the driver into telemetry

type ExecCommand

type ExecCommand interface {
	String() string
	Close() error
}

type GoogleAuthConfig

type GoogleAuthConfig struct {
	Type              string      `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
	ClientID          string      `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"`
	ClientSecret      string      `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"`
	RefreshToken      string      `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"`
	ServiceAccountKey interface{} `mapstructure:"service_account_key" json:"service_account_key,omitempty" yaml:"service_account_key,omitempty"`
	Subject           string      `mapstructure:"subject" json:"subject,omitempty" yaml:"subject,omitempty"`
}

func (*GoogleAuthConfig) FillPreconfiguredOauth

func (gac *GoogleAuthConfig) FillPreconfiguredOauth(sourceType string)

func (*GoogleAuthConfig) Marshal

func (gac *GoogleAuthConfig) Marshal() ([]byte, error)

func (*GoogleAuthConfig) ToGoogleAuthJSON

func (gac *GoogleAuthConfig) ToGoogleAuthJSON() GoogleAuthorizedUserJSON

ToGoogleAuthJSON returns configured GoogleAuthorizedUserJSON structure for Google authorization

func (*GoogleAuthConfig) Validate

func (gac *GoogleAuthConfig) Validate() error

Validate checks service account JSON or OAuth fields returns err if both authorization parameters are empty

type GoogleAuthorizedUserJSON

type GoogleAuthorizedUserJSON struct {
	ClientID     string `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"`
	ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"`
	RefreshToken string `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"`
	AuthType     string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
}

GoogleAuthorizedUserJSON is a Google dto for authorization

type IntervalDriver

type IntervalDriver struct {
	SourceType string
}

IntervalDriver is a base driver for native drivers

func (*IntervalDriver) Delete

func (ind *IntervalDriver) Delete() error

func (*IntervalDriver) GetDriversInfo

func (ind *IntervalDriver) GetDriversInfo() *DriversInfo

GetDriversInfo returns telemetry information about the driver

type ObjectsLoader

type ObjectsLoader = func(objects []map[string]interface{}, pos int, total int, percent int) error

type Property

type Property struct {
	//might be string or []string or nil
	Type       interface{}          `json:"type,omitempty"`
	Format     string               `json:"format,omitempty"`
	Properties map[string]*Property `json:"properties,omitempty"`
}

Property is a dto for catalog properties representation

type SourceConfig

type SourceConfig struct {
	SourceID string `json:"source_id" yaml:"-"`

	Type                   string   `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
	Destinations           []string `mapstructure:"destinations" json:"destinations,omitempty" yaml:"destinations,omitempty"`
	PostHandleDestinations []string `mapstructure:"post_handle_destinations" json:"post_handle_destinations,omitempty" yaml:"post_handle_destinations,omitempty"`

	Collections []interface{} `mapstructure:"collections" json:"collections,omitempty" yaml:"collections,omitempty"`
	Schedule    string        `mapstructure:"schedule" json:"schedule,omitempty" yaml:"schedule,omitempty"`

	Config        map[string]interface{} `mapstructure:"config" json:"config,omitempty" yaml:"config,omitempty"`
	Notifications map[string]interface{} `mapstructure:"notifications" json:"notifications,omitempty" yaml:"notifications,omitempty"`
	ProjectName   string                 `mapstructure:"project_name" json:"project_name,omitempty" yaml:"project_name,omitempty"`
}

SourceConfig is a dto for api connector source config serialization

type StreamConfiguration

type StreamConfiguration struct {
	Name        string   `mapstructure:"name" json:"name,omitempty" yaml:"name,omitempty"`
	Namespace   string   `mapstructure:"namespace" json:"namespace,omitempty" yaml:"namespace,omitempty"`
	SyncMode    string   `mapstructure:"sync_mode" json:"sync_mode,omitempty" yaml:"sync_mode,omitempty"`
	CursorField []string `mapstructure:"cursor_field" json:"cursor_field,omitempty" yaml:"cursor_field,omitempty"`
}

StreamConfiguration is a dto for serialization selected streams configuration

type StreamRepresentation

type StreamRepresentation struct {
	Namespace             string
	ChunkNumber           int
	StreamName            string
	IntermediateTableName string
	BatchHeader           *schema.BatchHeader
	KeyFields             []string
	Objects               []map[string]interface{}
	KeepKeysUnhashed      bool
	RemoveSourceKeyFields bool
	NeedClean             bool
	//Replace Stream table with IntermediateTableName (swap tables) Set only on final chunk
	SwapWithIntermediateTable bool
	DeleteConditions          *DeleteConditions
}

StreamRepresentation is a singer/airbyte stream representation

type SyncCommand

type SyncCommand struct {
	Cmd        ExecCommand
	TaskCloser CLITaskCloser
}

SyncCommand is a dto for keeping sync command (airbyte/singer) for graceful closing

func (*SyncCommand) Cancel

func (sc *SyncCommand) Cancel() error

Cancel uses Kill() under the hood

func (*SyncCommand) Kill

func (sc *SyncCommand) Kill(msg string) error

Kill closes runner and uses taskCloser with err msg

func (*SyncCommand) Shutdown

func (sc *SyncCommand) Shutdown() error

Shutdown uses Kill() under the hood

type TimeInterval

type TimeInterval struct {
	TimeZoneID string
	// contains filtered or unexported fields
}

func NewTimeInterval

func NewTimeInterval(granularity schema.Granularity, t time.Time) *TimeInterval

func (*TimeInterval) CalculateSignatureFrom

func (ti *TimeInterval) CalculateSignatureFrom(t time.Time, window time.Duration) string

func (*TimeInterval) Granularity

func (ti *TimeInterval) Granularity() schema.Granularity

func (*TimeInterval) IsAll

func (ti *TimeInterval) IsAll() bool

func (*TimeInterval) LowerEndpoint

func (ti *TimeInterval) LowerEndpoint() time.Time

func (*TimeInterval) String

func (ti *TimeInterval) String() string

func (*TimeInterval) UpperEndpoint

func (ti *TimeInterval) UpperEndpoint() time.Time

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL