Documentation

Index

Constants

View Source
const SignatureLayout = "2006-01-02T15:04:05.000Z"
View Source
const (
	SingerType = "singer"
)

Variables

This section is empty.

Functions

func Create

func Create(ctx context.Context, name string, sourceConfig *SourceConfig) (map[string]Driver, error)

Create source drivers per collection Enrich incoming configs with default values if needed

func RegisterDriverConstructor

func RegisterDriverConstructor(driverType string,
	createDriverFunc func(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error)) error

RegisterDriverConstructor registers function to create new driver instance per driver type

Types

type Collection

type Collection struct {
	Name       string                 `mapstructure:"name" json:"name,omitempty" yaml:"name,omitempty"`
	Type       string                 `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
	TableName  string                 `mapstructure:"table_name" json:"table_name,omitempty" yaml:"table_name,omitempty"`
	Parameters map[string]interface{} `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

func (Collection) GetTableName

func (c Collection) GetTableName() string

type Driver

type Driver interface {
	io.Closer
	//GetAllAvailableIntervals return all the available time intervals for data loading. It means, that if you want
	//your driver to load for the last year by month chunks, you need to return 12 time intervals, each covering one
	//month. There is drivers/granularity.ALL for data sources that store data which may not be split by date.
	GetAllAvailableIntervals() ([]*TimeInterval, error)
	//GetObjectsFor returns slice of objects per time interval. Each slice element is one object from the data source.
	GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)
	//Type returns string type of driver. Should be unique among drivers
	Type() string
	//GetCollectionTable returns table name and primary keys per collection
	GetCollectionTable() string
}

Driver interface must be implemented by every source type

func NewFacebookMarketing

func NewFacebookMarketing(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

func NewFirebase

func NewFirebase(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

func NewGoogleAnalytics

func NewGoogleAnalytics(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

func NewGooglePlay

func NewGooglePlay(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

func NewRedis

func NewRedis(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

func NewSinger

func NewSinger(ctx context.Context, sourceConfig *SourceConfig, collection *Collection) (Driver, error)

NewSinger return Singer driver and 1. write json files (config, catalog, properties, state) if string/raw json was provided 2. create venv 3. in another goroutine: update pip, install singer tap

type FacebookInsightsResponse

type FacebookInsightsResponse struct {
	Data []map[string]interface{} `facebook:"data"`
}

type FacebookMarketing

type FacebookMarketing struct {
	// contains filtered or unexported fields
}

func (*FacebookMarketing) Close

func (fm *FacebookMarketing) Close() error

func (*FacebookMarketing) GetAllAvailableIntervals

func (fm *FacebookMarketing) GetAllAvailableIntervals() ([]*TimeInterval, error)

GetAllAvailableIntervals return half a year by default

func (*FacebookMarketing) GetCollectionTable

func (fm *FacebookMarketing) GetCollectionTable() string

func (*FacebookMarketing) GetObjectsFor

func (fm *FacebookMarketing) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

func (*FacebookMarketing) Type

func (fm *FacebookMarketing) Type() string

type FacebookMarketingConfig

type FacebookMarketingConfig struct {
	AccountId string `mapstructure:"account_id" json:"account_id,omitempty" yaml:"account_id,omitempty"`
	Token     string `mapstructure:"token" json:"token,omitempty" yaml:"token,omitempty"`
}

func (*FacebookMarketingConfig) Validate

func (fmc *FacebookMarketingConfig) Validate() error

type FacebookReportConfig

type FacebookReportConfig struct {
	Keys    []string `mapstructure:"keys" json:"keys,omitempty" yaml:"keys,omitempty"`
	Metrics []string `mapstructure:"metrics" json:"metrics,omitempty" yaml:"metrics,omitempty"`
	Level   string   `mapstructure:"level" json:"level,omitempty" yaml:"level,omitempty"`
}

type Firebase

type Firebase struct {
	// contains filtered or unexported fields
}

func (*Firebase) Close

func (f *Firebase) Close() error

func (*Firebase) GetAllAvailableIntervals

func (f *Firebase) GetAllAvailableIntervals() ([]*TimeInterval, error)

func (*Firebase) GetCollectionTable

func (f *Firebase) GetCollectionTable() string

func (*Firebase) GetObjectsFor

func (f *Firebase) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

func (*Firebase) Type

func (f *Firebase) Type() string

type FirebaseConfig

type FirebaseConfig struct {
	ProjectId   string `mapstructure:"project_id" json:"project_id,omitempty" yaml:"project_id,omitempty"`
	Credentials string `mapstructure:"key" json:"key,omitempty" yaml:"key,omitempty"`
}

func (*FirebaseConfig) Validate

func (fc *FirebaseConfig) Validate() error

type GAReportFieldsConfig

type GAReportFieldsConfig struct {
	Dimensions []string `mapstructure:"dimensions" json:"dimensions,omitempty" yaml:"dimensions,omitempty"`
	Metrics    []string `mapstructure:"metrics" json:"metrics,omitempty" yaml:"metrics,omitempty"`
}

type GoogleAnalytics

type GoogleAnalytics struct {
	// contains filtered or unexported fields
}

func (*GoogleAnalytics) Close

func (g *GoogleAnalytics) Close() error

func (*GoogleAnalytics) GetAllAvailableIntervals

func (g *GoogleAnalytics) GetAllAvailableIntervals() ([]*TimeInterval, error)

func (*GoogleAnalytics) GetCollectionTable

func (g *GoogleAnalytics) GetCollectionTable() string

func (*GoogleAnalytics) GetObjectsFor

func (g *GoogleAnalytics) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

func (*GoogleAnalytics) Type

func (g *GoogleAnalytics) Type() string

type GoogleAnalyticsConfig

type GoogleAnalyticsConfig struct {
	AuthConfig *GoogleAuthConfig `mapstructure:"auth" json:"auth,omitempty" yaml:"auth,omitempty"`
	ViewId     string            `mapstructure:"view_id" json:"view_id,omitempty" yaml:"view_id,omitempty"`
}

func (*GoogleAnalyticsConfig) Validate

func (gac *GoogleAnalyticsConfig) Validate() error

type GoogleAuthConfig

type GoogleAuthConfig struct {
	ClientId          string      `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"`
	ClientSecret      string      `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"`
	RefreshToken      string      `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"`
	ServiceAccountKey interface{} `mapstructure:"service_account_key" json:"service_account_key,omitempty" yaml:"service_account_key,omitempty"`
}

func (*GoogleAuthConfig) Marshal

func (gac *GoogleAuthConfig) Marshal() ([]byte, error)

func (*GoogleAuthConfig) ToGoogleAuthJSON

func (gac *GoogleAuthConfig) ToGoogleAuthJSON() GoogleAuthorizedUserJSON

func (*GoogleAuthConfig) Validate

func (gac *GoogleAuthConfig) Validate() error

type GoogleAuthorizedUserJSON

type GoogleAuthorizedUserJSON struct {
	ClientId     string `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"`
	ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"`
	RefreshToken string `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"`
	AuthType     string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
}

type GooglePlay

type GooglePlay struct {
	// contains filtered or unexported fields
}

func (*GooglePlay) Close

func (gp *GooglePlay) Close() error

func (*GooglePlay) GetAllAvailableIntervals

func (gp *GooglePlay) GetAllAvailableIntervals() ([]*TimeInterval, error)

func (*GooglePlay) GetCollectionTable

func (gp *GooglePlay) GetCollectionTable() string

func (*GooglePlay) GetObjectsFor

func (gp *GooglePlay) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

func (*GooglePlay) Type

func (gp *GooglePlay) Type() string

type GooglePlayConfig

type GooglePlayConfig struct {
	AccountId  string            `mapstructure:"account_id" json:"account_id,omitempty" yaml:"account_id,omitempty"`
	AccountKey *GoogleAuthConfig `mapstructure:"auth" json:"auth,omitempty" yaml:"auth,omitempty"`
}

func (*GooglePlayConfig) Validate

func (gpc *GooglePlayConfig) Validate() error

type Granularity

type Granularity string
const (
	DAY   Granularity = "DAY"
	MONTH Granularity = "MONTH"
	YEAR  Granularity = "YEAR"
	ALL   Granularity = "ALL"
)

func (Granularity) Format

func (g Granularity) Format(t time.Time) string

func (Granularity) Lower

func (g Granularity) Lower(t time.Time) time.Time

func (Granularity) String

func (g Granularity) String() string

func (Granularity) Upper

func (g Granularity) Upper(t time.Time) time.Time

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

func (*Redis) Close

func (r *Redis) Close() error

func (*Redis) GetAllAvailableIntervals

func (r *Redis) GetAllAvailableIntervals() ([]*TimeInterval, error)

func (*Redis) GetCollectionTable

func (r *Redis) GetCollectionTable() string

func (*Redis) GetObjectsFor

func (r *Redis) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

func (*Redis) Type

func (r *Redis) Type() string

type RedisConfig

type RedisConfig struct {
	Host     string `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"`
	Port     int    `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"`
	Password string `mapstructure:"password" json:"password,omitempty" yaml:"password,omitempty"`
}

func (*RedisConfig) Validate

func (rc *RedisConfig) Validate() error

type Singer

type Singer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Singer) Close

func (s *Singer) Close() (multiErr error)

func (*Singer) GetAllAvailableIntervals

func (s *Singer) GetAllAvailableIntervals() ([]*TimeInterval, error)

GetAllAvailableIntervals unsupported

func (*Singer) GetCollectionTable

func (s *Singer) GetCollectionTable() string

GetCollectionTable unsupported

func (*Singer) GetObjectsFor

func (s *Singer) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)

GetObjectsFor unsupported

func (*Singer) GetTap

func (s *Singer) GetTap() string

func (*Singer) Load

func (s *Singer) Load(state string, strLogger *logging.SyncLogger, portionConsumer singer.PortionConsumer) error

func (*Singer) Ready

func (s *Singer) Ready() (bool, error)

func (*Singer) Type

func (s *Singer) Type() string

type SingerConfig

type SingerConfig struct {
	Tap          string      `mapstructure:"tap" json:"tap,omitempty" yaml:"tap,omitempty"`
	Config       interface{} `mapstructure:"config" json:"config,omitempty" yaml:"config,omitempty"`
	Catalog      interface{} `mapstructure:"catalog" json:"catalog,omitempty" yaml:"catalog,omitempty"`
	Properties   interface{} `mapstructure:"properties" json:"properties,omitempty" yaml:"properties,omitempty"`
	InitialState interface{} `mapstructure:"initial_state" json:"initial_state,omitempty" yaml:"initial_state,omitempty"`
}

func (*SingerConfig) Validate

func (sc *SingerConfig) Validate() error

type SourceConfig

type SourceConfig struct {
	Name         string        //without serialization
	Type         string        `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"`
	Destinations []string      `mapstructure:"destinations" json:"destinations,omitempty" yaml:"destinations,omitempty"`
	Collections  []interface{} `mapstructure:"collections" json:"collections,omitempty" yaml:"collections,omitempty"`

	Config map[string]interface{} `mapstructure:"config" json:"config,omitempty" yaml:"config,omitempty"`
}

type TimeInterval

type TimeInterval struct {
	TimeZoneId string
	// contains filtered or unexported fields
}

func NewTimeInterval

func NewTimeInterval(granularity Granularity, t time.Time) *TimeInterval

func (*TimeInterval) CalculateSignatureFrom

func (ti *TimeInterval) CalculateSignatureFrom(t time.Time) string

func (*TimeInterval) IsAll

func (ti *TimeInterval) IsAll() bool

func (*TimeInterval) LowerEndpoint

func (ti *TimeInterval) LowerEndpoint() time.Time

func (*TimeInterval) String

func (ti *TimeInterval) String() string

func (*TimeInterval) UpperEndpoint

func (ti *TimeInterval) UpperEndpoint() time.Time