Documentation
¶
Index ¶
- Constants
- Variables
- func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool
- func CheckForWarehouseEnvVars() bool
- func CheckPGHealth(dbHandle *sql.DB) bool
- func GetExludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)
- func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time
- func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool)
- func Init()
- func Init2()
- func Init3()
- func Init4()
- func Init5()
- func Init6()
- func InitWarehouseAPI(dbHandle *sql.DB, log logger.LoggerI)
- func PickupStagingConfiguration(job *PayloadT) bool
- func ScheduledTimes(syncFrequency, syncStartAt string) []int
- func Start(ctx context.Context, app app.Interface) error
- func TriggerUploadHandler(sourceID, destID string) error
- type BatchRouterEventT
- type ColumnInfoT
- type ConfigurationTestInput
- type ConfigurationTestOutput
- type ConstraintsI
- type ConstraintsViolationT
- type DB
- type DataT
- type ErrorResponseT
- type HandleT
- type IndexConstraintT
- type JobIDT
- type JobRunT
- type LoadFileJobT
- type MetadataT
- type PayloadT
- type ProcessStagingFilesJobT
- type QueryInput
- type RetryRequest
- type RetryResponse
- type SchemaHandleT
- type StagingFileT
- type TableSkipError
- type TableUploadIDInfoT
- type TableUploadReqT
- type TableUploadResT
- type TableUploadStatusInfoT
- type TableUploadStatusT
- type TableUploadT
- type TablesResT
- type UploadAPIT
- type UploadColumnT
- type UploadColumnsOpts
- type UploadJobT
- func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
- func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
- func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
- func (job *UploadJobT) GetLoadFileType() string
- func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
- func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
- func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
- func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
- func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
- func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
- func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
- func (job *UploadJobT) UseRudderStorage() bool
- type UploadPagination
- type UploadReqT
- type UploadResT
- type UploadStatusOpts
- type UploadT
- type UploadsReqT
- type UploadsResT
- type WarehouseAdmin
- type WorkerIdentifierT
Constants ¶
const ( STATS_WORKER_IDLE_TIME = "worker_idle_time" STATS_WORKER_CLAIM_PROCESSING_TIME = "worker_claim_processing_time" STATS_WORKER_CLAIM_PROCESSING_FAILED = "worker_claim_processing_failed" STATS_WORKER_CLAIM_PROCESSING_SUCCEEDED = "worker_claim_processing_succeeded" TAG_WORKERID = "workerId" )
const ( Waiting = "waiting" GeneratedUploadSchema = "generated_upload_schema" CreatedTableUploads = "created_table_uploads" GeneratedLoadFiles = "generated_load_files" UpdatedTableUploadsCounts = "updated_table_uploads_counts" CreatedRemoteSchema = "created_remote_schema" ExportedUserTables = "exported_user_tables" ExportedData = "exported_data" ExportedIdentities = "exported_identities" Aborted = "aborted" )
Upload Status
const ( GeneratingStagingFileFailedState = "generating_staging_file_failed" GeneratedStagingFileState = "generated_staging_file" PopulatingHistoricIdentitiesState = "populating_historic_identities" PopulatingHistoricIdentitiesStateFailed = "populating_historic_identities_failed" FetchingRemoteSchemaFailed = "fetching_remote_schema_failed" InternalProcessingFailed = "internal_processing_failed" )
const ( TableUploadExecuting = "executing" TableUploadUpdatingSchema = "updating_schema" TableUploadUpdatingSchemaFailed = "updating_schema_failed" TableUploadUpdatedSchema = "updated_schema" TableUploadExporting = "exporting_data" TableUploadExportingFailed = "exporting_data_failed" UserTableUploadExportingFailed = "exporting_user_tables_failed" IdentityTableUploadExportingFailed = "exporting_identities_failed" TableUploadExported = "exported_data" )
Table Upload status
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" )
const ( MasterMode = "master" SlaveMode = "slave" MasterSlaveMode = "master_and_slave" EmbeddedMode = "embedded" PooledWHSlaveMode = "embedded_master" )
warehouses worker modes
const (
CloudSourceCateogry = "cloud"
)
const (
DegradedMode = "degraded"
)
const (
WorkerProcessingDownloadStagingFileFailed = "worker_processing_download_staging_file_failed"
)
Variables ¶
var (
ShouldForceSetLowerVersion bool
)
Functions ¶
func CheckForWarehouseEnvVars ¶
func CheckForWarehouseEnvVars() bool
CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present
func CheckPGHealth ¶
func GetPrevScheduledTime ¶
GetPrevScheduledTime returns closest previous scheduled time eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (eg. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)
func HandleSchemaChange ¶
func ScheduledTimes ¶
ScheduledTimes returns all possible start times (minutes from start of day) as per schedule eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)
func TriggerUploadHandler ¶
Types ¶
type BatchRouterEventT ¶
func (*BatchRouterEventT) GetColumnInfo ¶
func (event *BatchRouterEventT) GetColumnInfo(columnName string) (columnInfo ColumnInfoT, ok bool)
type ColumnInfoT ¶
type ColumnInfoT struct {
ColumnVal interface{}
ColumnType string
}
type ConfigurationTestInput ¶
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput ¶
type ConstraintsI ¶
type ConstraintsI interface {
// contains filtered or unexported methods
}
type ConstraintsViolationT ¶
type ConstraintsViolationT struct {
// contains filtered or unexported fields
}
func ViolatedConstraints ¶
func ViolatedConstraints(destinationType string, brEvent *BatchRouterEventT, columnName string) (cv *ConstraintsViolationT)
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB encapsulate interactions of warehouse operations with the database.
func NewWarehouseDB ¶
type ErrorResponseT ¶
type ErrorResponseT struct {
Error string
}
type IndexConstraintT ¶
type JobRunT ¶
type JobRunT struct {
// contains filtered or unexported fields
}
JobRunT Temporary store for processing staging file to load file
func (*JobRunT) GetWriter ¶
func (jobRun *JobRunT) GetWriter(tableName string) (warehouseutils.LoadFileWriterI, error)
type LoadFileJobT ¶
type LoadFileJobT struct {
Upload UploadT
StagingFile *StagingFileT
Schema map[string]map[string]string
Warehouse warehouseutils.WarehouseT
Wg *misc.WaitGroup
LoadFileIDsChan chan []int64
TableToBucketFolderMap map[string]string
TableToBucketFolderMapLock *sync.RWMutex
}
type PayloadT ¶
type PayloadT struct {
BatchID string
UploadID int64
StagingFileID int64
StagingFileLocation string
UploadSchema map[string]map[string]string
SourceID string
SourceName string
DestinationID string
DestinationName string
DestinationType string
DestinationNamespace string
DestinationRevisionID string
StagingDestinationRevisionID string
DestinationConfig interface{}
StagingDestinationConfig interface{}
UseRudderStorage bool
StagingUseRudderStorage bool
UniqueLoadGenID string
RudderStoragePrefix string
Output []loadFileUploadOutputT
LoadFilePrefix string // prefix for the load file name
LoadFileType string
}
type ProcessStagingFilesJobT ¶
type ProcessStagingFilesJobT struct {
Upload UploadT
List []*StagingFileT
Warehouse warehouseutils.WarehouseT
}
type QueryInput ¶
type RetryRequest ¶
type RetryRequest struct {
WorkspaceID string
SourceID string
DestinationID string
DestinationType string
IntervalInHours int64 // Optional, if provided we will retry based on the interval provided
UploadIds []int64 // Optional, if provided we will retry the upload ids provided
ForceRetry bool
API UploadAPIT
}
func (*RetryRequest) RetryWHUploads ¶
func (retryReq *RetryRequest) RetryWHUploads() (response RetryResponse, err error)
type RetryResponse ¶
type SchemaHandleT ¶
type SchemaHandleT struct {
// contains filtered or unexported fields
}
type StagingFileT ¶
type StagingFileT struct {
ID int64
Location string
SourceID string
Schema json.RawMessage
Status string // enum
CreatedAt time.Time
FirstEventAt time.Time
LastEventAt time.Time
UseRudderStorage bool
DestinationRevisionID string
// cloud sources specific info
SourceBatchID string
SourceTaskID string
SourceTaskRunID string
SourceJobID string
SourceJobRunID string
TimeWindow time.Time
}
type TableSkipError ¶
type TableSkipError struct {
// contains filtered or unexported fields
}
TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load
func (*TableSkipError) Error ¶
func (tse *TableSkipError) Error() string
type TableUploadIDInfoT ¶
type TableUploadIDInfoT struct {
// contains filtered or unexported fields
}
TableUploadIDInfoT captures the uploadID and error for [uploadID][tableName]
type TableUploadReqT ¶
type TableUploadReqT struct {
UploadID int64
Name string
API UploadAPIT
}
func (TableUploadReqT) GetWhTableUploads ¶
func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, error)
type TableUploadResT ¶
type TableUploadStatusInfoT ¶
type TableUploadStatusInfoT struct {
// contains filtered or unexported fields
}
TableUploadStatusInfoT captures the status and error for [uploadID][tableName]
type TableUploadStatusT ¶
type TableUploadStatusT struct {
// contains filtered or unexported fields
}
TableUploadStatusT captures the status of each table upload along with its parent upload_job's info like destionation_id and namespace
type TableUploadT ¶
type TableUploadT struct {
// contains filtered or unexported fields
}
func NewTableUpload ¶
func NewTableUpload(uploadID int64, tableName string) *TableUploadT
type TablesResT ¶
type TablesResT struct {
Tables []TableUploadResT `json:"tables,omitempty"`
}
type UploadAPIT ¶
type UploadAPIT struct {
// contains filtered or unexported fields
}
var UploadAPI UploadAPIT
type UploadColumnT ¶
type UploadColumnT struct {
Column string
Value interface{}
}
type UploadColumnsOpts ¶
type UploadColumnsOpts struct {
Fields []UploadColumnT
Txn *sql.Tx
}
type UploadJobT ¶
type UploadJobT struct {
// contains filtered or unexported fields
}
func (*UploadJobT) Aborted ¶
func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
Aborted makes a check that if the state of the job should be aborted
func (*UploadJobT) GetFirstLastEvent ¶
func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
func (*UploadJobT) GetLoadFileGenStartTIme ¶
func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
func (*UploadJobT) GetLoadFileType ¶
func (job *UploadJobT) GetLoadFileType() string
func (*UploadJobT) GetLoadFilesMetadata ¶
func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
func (*UploadJobT) GetLocalSchema ¶
func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
func (*UploadJobT) GetSampleLoadFileLocation ¶
func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
func (*UploadJobT) GetSchemaInWarehouse ¶
func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
func (*UploadJobT) GetSingleLoadFile ¶
func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
func (*UploadJobT) GetTableSchemaInUpload ¶
func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) GetTableSchemaInWarehouse ¶
func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) ShouldOnDedupUseNewRecord ¶
func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
func (*UploadJobT) UpdateLocalSchema ¶
func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
func (*UploadJobT) UseRudderStorage ¶
func (job *UploadJobT) UseRudderStorage() bool
type UploadPagination ¶
type UploadReqT ¶
type UploadReqT struct {
WorkspaceID string
UploadId int64
API UploadAPIT
}
func (UploadReqT) GetWHUpload ¶
func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error)
func (UploadReqT) TriggerWHUpload ¶
func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)
type UploadResT ¶
type UploadResT struct {
ID int64 `json:"id"`
Namespace string `json:"namespace"`
SourceID string `json:"source_id"`
DestinationID string `json:"destination_id"`
DestinationType string `json:"destination_type"`
Status string `json:"status"`
Error string `json:"error"`
Attempt int32 `json:"attempt"`
Duration int32 `json:"duration"`
NextRetryTime string `json:"nextRetryTime"`
FirstEventAt time.Time `json:"first_event_at"`
LastEventAt time.Time `json:"last_event_at"`
Tables []TableUploadResT `json:"tables,omitempty"`
}
type UploadStatusOpts ¶
type UploadStatusOpts struct {
Status string
AdditionalFields []UploadColumnT
ReportingMetric types.PUReportedMetric
}
type UploadT ¶
type UploadT struct {
ID int64
Namespace string
SourceID string
SourceType string
SourceCategory string
DestinationID string
DestinationType string
StartStagingFileID int64
EndStagingFileID int64
StartLoadFileID int64
EndLoadFileID int64
Status string
UploadSchema warehouseutils.SchemaT
MergedSchema warehouseutils.SchemaT
Error json.RawMessage
Timings []map[string]string
FirstAttemptAt time.Time
LastAttemptAt time.Time
Attempts int64
Metadata json.RawMessage
FirstEventAt time.Time
LastEventAt time.Time
UseRudderStorage bool
LoadFileGenStartTime time.Time
TimingsObj sql.NullString
Priority int
// cloud sources specific info
SourceBatchID string
SourceTaskID string
SourceTaskRunID string
SourceJobID string
SourceJobRunID string
LoadFileType string
}
type UploadsReqT ¶
type UploadsReqT struct {
WorkspaceID string
SourceID string
DestinationID string
DestinationType string
Status string
Limit int32
Offset int32
API UploadAPIT
}
func (*UploadsReqT) GetWhUploads ¶
func (uploadsReq *UploadsReqT) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)
func (*UploadsReqT) TriggerWhUploads ¶
func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)
type UploadsResT ¶
type UploadsResT struct {
Uploads []UploadResT `json:"uploads"`
Pagination UploadPagination `json:"pagination"`
}
type WarehouseAdmin ¶
type WarehouseAdmin struct{}
func (*WarehouseAdmin) ConfigurationTest ¶
func (wh *WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error
ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) Query ¶
func (wh *WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error
Query the underlying warehouse
func (*WarehouseAdmin) TriggerUpload ¶
func (wh *WarehouseAdmin) TriggerUpload(off bool, reply *string) error
TriggerUpload sets uploads to start without delay
type WorkerIdentifierT ¶
type WorkerIdentifierT string