etlutil

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

package etlutil has various helper functions used by components of goetl.

Index

Constants

View Source
const (
	DateLayout = "2006-01-02"
	TimeLayout = "15:04:05"
)
View Source
const (
	TypeableColumnName = "goetl_data_type"
)

TypeableColumnName is exposed as a constant to prevent fat fingering.

Variables

This section is empty.

Functions

func BeginningOfDay

func BeginningOfDay(t time.Time) time.Time

BeginningOfDay returns the time (in any location) of the start of that location's day.

func BeginningOfMonth added in v1.0.7

func BeginningOfMonth(t time.Time) time.Time

BeginningOfMonth returns the time (in any location) of the start of that location's month.

func BeginningOfTodayInUTC

func BeginningOfTodayInUTC(loc *time.Location) time.Time

BeginningOfTodayInUTC looks at the beginning of the day based on the provided location and then converts it back to UTC.

func CSVProcess

func CSVProcess(params *CSVParameters, d etldata.Payload, outputChan chan etldata.Payload, killChan chan error)

CSVProcess writes the contents to the file and optionally sends the written bytes upstream on outputChan

func CSVString

func CSVString(v interface{}) string

CSVString returns an empty string for nil values to make sure that the text "null" is not written to a file

func CreateTempTable

func CreateTempTable(tx *sql.Tx, likeTable string) (string, error)

CreateTempTable generates a unique table name and creates schema based on the target table.

func DaysBetween

func DaysBetween(start, finish time.Time) int

DaysBetween calculates how many days pass between two time objects.

func DaysInMonth

func DaysInMonth(month string) int

DaysInMonth takes a month (in monthLayout) and returns how many days are in that month.

func Dedupe

func Dedupe(tx *sql.Tx, targetTable string) error

Dedupe writes all unique values into a temp table then runs TruncateMerge.

func DeleteS3Objects

func DeleteS3Objects(client *s3.S3, bucket string, objKeys []string) (*s3.DeleteObjectsOutput, error)

DeleteS3Objects deletes the objects specified by the given object keys

func DeltaMerge

func DeltaMerge(tx *sql.Tx, targetTable, tempTable, conditional string) error

DeltaMerge deletes any records in the targetTable that are in the tempTable bound by the conditional. It then inserts all records in the tempTable into the targetTable.

This should be used when you are inserting a subset of records into the tempTable (instead of running a complete snapshot). You would then join based on the primary key, so that all records written to the tempTable will only appear once in the targetTable once the job is complete.

This is effectively a workaround for the lack of primary key constraints in Redshift.

func ExecuteSQLQuery

func ExecuteSQLQuery(db *sql.DB, query string) error

ExecuteSQLQuery allows you to execute arbitrary SQL statements

func ExecuteSQLQueryTx

func ExecuteSQLQueryTx(tx *sql.Tx, query string) error

ExecuteSQLQueryTx allows you to execute arbitrary SQL statements within a transaction.

func FirstDayOfWeek

func FirstDayOfWeek(day time.Time, firstDayOfWeek time.Weekday) time.Time

FirstDayOfWeek takes a time object and returns the first day of the week of that time object (scoped to firstDayOfWeek).

func GetDataFromSQLQuery

func GetDataFromSQLQuery(db *sql.DB, query string, batchSize int, structDest interface{}) (chan etldata.Payload, error)

GetDataFromSQLQuery is a util function that, given a properly intialized sql.DB and a valid SQL query, will handle executing the query and getting back etldata.JSON objects. This function is asynch, and etldata.JSON should be received on the return data channel. If there was a problem setting up the query, then an error will also be returned immediately. It is also possible for errors to occur during execution as data is retrieved from the query. If this happens, the object returned will be a JSON object in the form of {"Error": "description"}.

func GetS3Object

func GetS3Object(client *s3.S3, bucket, objKey string) (*s3.GetObjectOutput, error)

GetS3Object returns the object output for the given object key

func KillPipelineIfErr

func KillPipelineIfErr(err error, killChan chan error)

KillPipelineIfErr is an error-checking helper.

func LastMonth

func LastMonth() string

LastMonth the monthLayout of last month.

func ListS3Objects

func ListS3Objects(client *s3.S3, bucket, keyPrefix string) ([]string, error)

ListS3Objects returns all object keys matching the given prefix. Note that delimiter is set to "/". See http://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysHierarchy.html

func MonthDateRange

func MonthDateRange(month string) (startDate, endDate string)

MonthDateRange takes a month (in monthLayout) and returns the first and last day of that month (in DateLayout).

func MonthToDate

func MonthToDate() (month, startDate, endDate string)

MonthToDate starts from yesterday and returns the monthLayout of yesterday, the DateLayout of the first of yesterday's month, and the DateLayout of yesterday.

func MonthToTime added in v1.0.4

func MonthToTime(month string) time.Time

MonthToTime takes a month (in monthLayout) and returns the time object of its first day.

func MonthToTimeInLocation added in v1.0.5

func MonthToTimeInLocation(month string, loc *time.Location) time.Time

func MonthsAgo

func MonthsAgo(ago int) string

MonthsAgo returns the monthLayout of X months ago.

func MonthsAgoFromYesterday

func MonthsAgoFromYesterday(ago int) string

MonthsAgoFromYesterday defers to MonthsAgo from yesterday's date.

func MySQLInsertData

func MySQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, onDupKeyFields []string, batchSize int) error

MySQLInsertData abstracts building and executing a SQL INSERT statement for the given Data object.

Note that the Data must be a valid JSON object (or an array of valid objects all with the same keys), where the keys are column names and the the values are SQL values to be inserted into those columns.

func PostgreSQLInsertData

func PostgreSQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, onDupKeyIndex string, onDupKeyFields []string, batchSize int) error

PostgreSQLInsertData abstracts building and executing a SQL INSERT statement for the given Data object.

Note that the Data must be a valid JSON object (or an array of valid objects all with the same keys), where the keys are column names and the the values are SQL values to be inserted into those columns.

If onDupKeyUpdate is true, you must set an onDupKeyIndex. This translates to the conflict_target as specified in https://www.postgresql.org/docs/9.5/static/sql-insert.html

func PurgeMerge

func PurgeMerge(tx *sql.Tx, targetTable, tempTable, conditional string) error

PurgeMerge clears out the targetTable based on the conditional, and then writes all records from the tempTable into targetTable. This method is used when a full snapshot of a specific applicationID table is written in into tempTable.

func QuarterToDate

func QuarterToDate() (startDate, endDate string)

QuarterToDate looks at yesterday and returns yesterday's quarterly start date (in DateLayout) and yesterday (in DateLayout.

func S3Prefix

func S3Prefix(table string) string

S3Prefix generates a unique prefix.

func SftpClient

func SftpClient(server string, username string, authMethod []ssh.AuthMethod, opts ...sftp.ClientOption) (*sftp.Client, error)

SftpClient sets up and return the client

func SftpKeyAuth

func SftpKeyAuth(privateKeyPath string) (auth ssh.AuthMethod, err error)

SftpKeyAuth generates an ssh.AuthMethod given the path of a private key

func TruncateMerge

func TruncateMerge(tx *sql.Tx, targetTable, tempTable string) error

TruncateMerge clears out the targetTable and then writes all records from the tempTable into targetTable. This method is used when a full snapshot of the table is written in its entirety into tempTable.

func Typecheck

func Typecheck(d etldata.Payload) (key string, err error)

Typecheck returns the value of the Typeable.Type.

func UUID

func UUID() (id string, err error)

UUID returns a new UUID

func VacuumAll

func VacuumAll(db *sql.DB) error

VacuumAll vacuums all tables.

func VacuumTable

func VacuumTable(db *sql.DB, table string) error

VacuumTable vacuums a specific table.

func WriteS3Object

func WriteS3Object(data []string, config *aws.Config, bucket string, key string, lineSeparator string, compress bool) (string, error)

WriteS3Object writes the data to the given key, optionally compressing it first

Types

type CSVParameters

type CSVParameters struct {
	Writer        *CSVWriter
	WriteHeader   bool
	HeaderWritten bool
	Header        []string
	SendUpstream  bool
	QuoteEscape   string
	Comma         rune
}

CSVParameters allows you to define all of your csv writing preferences in a single struct for reuse in multiple processors

type CSVWriter

type CSVWriter struct {
	Comma   rune
	UseCRLF bool

	AlwaysEncapsulate bool   // If the content should be encapsulated independent of its type
	QuoteEscape       string // String to use to escape a quote character
	// contains filtered or unexported fields
}

CSVWriter reimplements the standard library csv.Writer adding AlwaysEncapsulate and QuoteEscape

func NewCSVWriter

func NewCSVWriter() *CSVWriter

NewCSVWriter instantiates a new instance of CSVWriter

func (*CSVWriter) Error

func (w *CSVWriter) Error() error

Error reports any error that has occurred during a previous Write or Flush.

func (*CSVWriter) Flush

func (w *CSVWriter) Flush()

Flush writes any buffered data to the underlying io.Writer. To check if an error occurred during the Flush, call Error.

func (*CSVWriter) SetWriter

func (w *CSVWriter) SetWriter(writer io.Writer)

SetWriter allows you to change the writer (which is not directly exposed)

func (*CSVWriter) Write

func (w *CSVWriter) Write(record []string) (err error)

Write writes a single CSV record to w along with any necessary quoting. A record is a slice of strings with each string being one field.

func (*CSVWriter) WriteAll

func (w *CSVWriter) WriteAll(records [][]string) (err error)

WriteAll writes multiple CSV records to w using Write and then calls Flush.

type SftpParameters

type SftpParameters struct {
	Server      string
	Username    string
	Path        string
	AuthMethods []ssh.AuthMethod
}

SftpParameters is used for storing connection parameters for later executing sftp commands

type SftpPath

type SftpPath struct {
	Path string `json:"path,omitempty"`
}

SftpPath is a simple struct for storing the full path of an object

func (SftpPath) FileName

func (t SftpPath) FileName() string

FileName defers to filepath.Base

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer is a basic mechanism for measuring execution time.

func StartTimer

func StartTimer() (t *Timer)

StartTimer returns a new Timer that's already "started".

func (*Timer) Duration

func (t *Timer) Duration() time.Duration

Duration returns either the total executino duration (if Timer stopped) or the duration until time.Now() if timer is still running.

func (*Timer) Stop

func (t *Timer) Stop() *Timer

Stop sets the end time for the Timer and returns itself.

func (*Timer) Stopped

func (t *Timer) Stopped() bool

Stopped returns true if Stop() has been called on the timer.

func (*Timer) String

func (t *Timer) String() string

type Typeable

type Typeable struct {
	Type string `json:"goetl_data_type"`
}

Typeable looks at the type attribute.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL