Documentation

Overview

    Package processors contains built-in DataProcessor implementations that are generic and potentially useful across any ETL project.

    Index

    Constants

    This section is empty.

    Variables

    This section is empty.

    Functions

    This section is empty.

    Types

    type BigQueryConfig

    type BigQueryConfig struct {
    	JsonPemPath string
    	ProjectID   string
    	DatasetID   string
    }

      BigQueryConfig is used when init'ing new BigQueryReader instances.

      type BigQueryReader

      type BigQueryReader struct {
      	PageSize         int    // defaults to 5000
      	AggregateResults bool   // determines whether to send data as soon as available or to aggregate and send all query results, defaults to false
      	UnflattenResults bool   // defaults to false
      	TmpTableName     string // Used when UnflattenResults is true. default to "_ratchet_tmp"
      	ConcurrencyLevel int    // See ConcurrentDataProcessor
      	// contains filtered or unexported fields
      }

        BigQueryReader is used to query data from Google's BigQuery, and it behaves similarly to SQLReader. See SQLReader docs for explanation on static vs dynamic querying.

        Note: If your data set contains nested/repeated fields you will likely want to get results back "unflattened." By default BigQuery returns results in a flattened format, which duplicates rows for each repeated value. This can be annoying to deal with, so BigQueryReader provides a "UnflattenResults" flag that will handle querying in such a way to get back unflattened results. This involves using a temporary table setting and a couple of other special query settings - read the BigQuery docs related to flatten and repeated fields for more info.

        func NewBigQueryReader

        func NewBigQueryReader(config *BigQueryConfig, query string) *BigQueryReader

          NewBigQueryReader returns an instance of a BigQueryExtractor ready to run a static query.

          func NewDynamicBigQueryReader

          func NewDynamicBigQueryReader(config *BigQueryConfig, sqlGenerator func(data.JSON) (string, error)) *BigQueryReader

            NewDynamicBigQueryReader returns an instance of a BigQueryExtractor ready to run a dynamic query based on the sqlGenerator function.

            func (*BigQueryReader) Concurrency

            func (r *BigQueryReader) Concurrency() int

              Concurrency defers to ConcurrentDataProcessor

              func (*BigQueryReader) Finish

              func (r *BigQueryReader) Finish(outputChan chan data.JSON, killChan chan error)

                Finish - see interface for documentation.

                func (*BigQueryReader) ForEachQueryData

                func (r *BigQueryReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))

                  ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.

                  func (*BigQueryReader) ProcessData

                  func (r *BigQueryReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                    ProcessData defers to ForEachQueryData

                    func (*BigQueryReader) String

                    func (r *BigQueryReader) String() string

                    type BigQueryWriter

                    type BigQueryWriter struct {
                    	ConcurrencyLevel int // See ConcurrentDataProcessor
                    	// contains filtered or unexported fields
                    }

                      BigQueryWriter is used to write data to Google's BigQuery. If the table you want to write to already exists, use NewBigQueryWriter, otherwise use NewBigQueryWriterForNewTable and the desired table structure will be created when the client is initiated.

                      func NewBigQueryWriter

                      func NewBigQueryWriter(config *BigQueryConfig, tableName string) *BigQueryWriter

                        NewBigQueryWriter instantiates a new instance of BigQueryWriter

                        func NewBigQueryWriterForNewTable

                        func NewBigQueryWriterForNewTable(config *BigQueryConfig, tableName string, fields map[string]string) *BigQueryWriter

                          NewBigQueryWriterForNewTable instantiates a new instance of BigQueryWriter and prepares to write results to a new table

                          func (*BigQueryWriter) Concurrency

                          func (w *BigQueryWriter) Concurrency() int

                            Concurrency delegates to ConcurrentDataProcessor

                            func (*BigQueryWriter) Finish

                            func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error)

                              Finish - see interface for documentation.

                              func (*BigQueryWriter) ProcessData

                              func (w *BigQueryWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                ProcessData defers to WriterBatch

                                func (*BigQueryWriter) String

                                func (w *BigQueryWriter) String() string

                                func (*BigQueryWriter) WriteBatch

                                func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error)

                                  WriteBatch inserts the supplied data into BigQuery

                                  type CSVTransformer

                                  type CSVTransformer struct {
                                  	Parameters util.CSVParameters
                                  }

                                    CSVTransformer converts data.JSON objects into a CSV string object and sends it on to the next stage. In use-cases where you simply want to write to a CSV file, use CSVWriter instead.

                                    CSVTransformer is for more complex use-cases where you need to generate CSV data and perhaps send it to multiple output stages.

                                    func NewCSVTransformer

                                    func NewCSVTransformer() *CSVTransformer

                                      NewCSVTransformer returns a new CSVTransformer wrapping the given io.Writer object

                                      func (*CSVTransformer) Finish

                                      func (w *CSVTransformer) Finish(outputChan chan data.JSON, killChan chan error)

                                        Finish - see interface for documentation.

                                        func (*CSVTransformer) ProcessData

                                        func (w *CSVTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                          ProcessData defers to util.CSVProcess

                                          func (*CSVTransformer) String

                                          func (w *CSVTransformer) String() string

                                          type CSVWriter

                                          type CSVWriter struct {
                                          	Parameters util.CSVParameters
                                          }

                                            CSVWriter is handles converting data.JSON objects into CSV format, and writing them to the given io.Writer. The Data must be a valid JSON object or a slice of valid JSON objects. If you already have Data formatted as a CSV string you can use an IoWriter instead.

                                            func NewCSVWriter

                                            func NewCSVWriter(w io.Writer) *CSVWriter

                                              NewCSVWriter returns a new CSVWriter wrapping the given io.Writer object

                                              func (*CSVWriter) Finish

                                              func (w *CSVWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                Finish - see interface for documentation.

                                                func (*CSVWriter) ProcessData

                                                func (w *CSVWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                  ProcessData defers to util.CSVProcess

                                                  func (*CSVWriter) String

                                                  func (w *CSVWriter) String() string

                                                  type FileReader

                                                  type FileReader struct {
                                                  	// contains filtered or unexported fields
                                                  }

                                                    FileReader opens and reads the contents of the given filename.

                                                    func NewFileReader

                                                    func NewFileReader(filename string) *FileReader

                                                      NewFileReader returns a new FileReader that will read the entire contents of the given file path and send it at once. For buffered or line-by-line reading try using IoReader.

                                                      func (*FileReader) Finish

                                                      func (r *FileReader) Finish(outputChan chan data.JSON, killChan chan error)

                                                        Finish - see interface for documentation.

                                                        func (*FileReader) ProcessData

                                                        func (r *FileReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                          ProcessData reads a file and sends its contents to outputChan

                                                          func (*FileReader) String

                                                          func (r *FileReader) String() string

                                                          type FtpWriter

                                                          type FtpWriter struct {
                                                          	// contains filtered or unexported fields
                                                          }

                                                            FtpWriter type represents an ftp writter processor

                                                            func NewFtpWriter

                                                            func NewFtpWriter(host, username, password, path string) *FtpWriter

                                                              NewFtpWriter instantiates new instance of an ftp writer

                                                              func (*FtpWriter) Finish

                                                              func (f *FtpWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                Finish closes open references to the remote file and server

                                                                func (*FtpWriter) ProcessData

                                                                func (f *FtpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                  ProcessData writes data as is directly to the output file

                                                                  func (*FtpWriter) String

                                                                  func (f *FtpWriter) String() string

                                                                  type FuncTransformer

                                                                  type FuncTransformer struct {
                                                                  	Name             string // can be set for more useful log output
                                                                  	ConcurrencyLevel int    // See ConcurrentDataProcessor
                                                                  	// contains filtered or unexported fields
                                                                  }

                                                                    FuncTransformer executes the given function on each data payload, sending the resuling data to the next stage.

                                                                    While FuncTransformer is useful for simple data transformation, more complicated tasks justify building a custom implementation of DataProcessor.

                                                                    func NewFuncTransformer

                                                                    func NewFuncTransformer(transform func(d data.JSON) data.JSON) *FuncTransformer

                                                                      NewFuncTransformer instantiates a new instance of func transformer

                                                                      func (*FuncTransformer) Concurrency

                                                                      func (t *FuncTransformer) Concurrency() int

                                                                        Concurrency defers to ConcurrentDataProcessor

                                                                        func (*FuncTransformer) Finish

                                                                        func (t *FuncTransformer) Finish(outputChan chan data.JSON, killChan chan error)

                                                                          Finish - see interface for documentation.

                                                                          func (*FuncTransformer) ProcessData

                                                                          func (t *FuncTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                            ProcessData runs the supplied func and sends the returned value to outputChan

                                                                            func (*FuncTransformer) String

                                                                            func (t *FuncTransformer) String() string

                                                                            type HTTPRequest

                                                                            type HTTPRequest struct {
                                                                            	Request *http.Request
                                                                            	Client  *http.Client
                                                                            }

                                                                              HTTPRequest executes an HTTP request and passes along the response body. It is simply wrapping an http.Request and http.Client object. See the net/http docs for more info: https://golang.org/pkg/net/http

                                                                              func NewHTTPRequest

                                                                              func NewHTTPRequest(method, url string, body io.Reader) (*HTTPRequest, error)

                                                                                NewHTTPRequest creates a new HTTPRequest and is essentially wrapping net/http's NewRequest function. See https://golang.org/pkg/net/http/#NewRequest

                                                                                func (*HTTPRequest) Finish

                                                                                func (r *HTTPRequest) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                  Finish - see interface for documentation.

                                                                                  func (*HTTPRequest) ProcessData

                                                                                  func (r *HTTPRequest) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                    ProcessData sends data to outputChan if the response body is not null

                                                                                    func (*HTTPRequest) String

                                                                                    func (r *HTTPRequest) String() string

                                                                                    type IoReader

                                                                                    type IoReader struct {
                                                                                    	Reader     io.Reader
                                                                                    	LineByLine bool // defaults to true
                                                                                    	BufferSize int
                                                                                    	Gzipped    bool
                                                                                    }

                                                                                      IoReader wraps an io.Reader and reads it.

                                                                                      func NewIoReader

                                                                                      func NewIoReader(reader io.Reader) *IoReader

                                                                                        NewIoReader returns a new IoReader wrapping the given io.Reader object.

                                                                                        func (*IoReader) Finish

                                                                                        func (r *IoReader) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                          Finish - see interface for documentation.

                                                                                          func (*IoReader) ForEachData

                                                                                          func (r *IoReader) ForEachData(killChan chan error, foo func(d data.JSON))

                                                                                            ForEachData either reads by line or by buffered stream, sending the data back to the anonymous func that ultimately shoves it onto the outputChan

                                                                                            func (*IoReader) ProcessData

                                                                                            func (r *IoReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                              ProcessData overwrites the reader if the content is Gzipped, then defers to ForEachData

                                                                                              func (*IoReader) String

                                                                                              func (r *IoReader) String() string

                                                                                              type IoReaderWriter

                                                                                              type IoReaderWriter struct {
                                                                                              	IoReader
                                                                                              	IoWriter
                                                                                              }

                                                                                                IoReaderWriter performs both the job of a IoReader and IoWriter. It will read data from the given io.Reader, write the resulting data to the given io.Writer, and (if the write was successful) send the data to the next stage of processing.

                                                                                                IoReaderWriter is composed of both a IoReader and IoWriter, so it supports all of the same properties and usage options.

                                                                                                func NewIoReaderWriter

                                                                                                func NewIoReaderWriter(reader io.Reader, writer io.Writer) *IoReaderWriter

                                                                                                  NewIoReaderWriter returns a new IoReaderWriter wrapping the given io.Reader object

                                                                                                  func (*IoReaderWriter) Finish

                                                                                                  func (r *IoReaderWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                    Finish - see interface for documentation.

                                                                                                    func (*IoReaderWriter) ProcessData

                                                                                                    func (r *IoReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                      ProcessData grabs data from IoReader.ForEachData, then sends it to IoWriter.ProcessData in addition to sending it upstream on the outputChan

                                                                                                      func (*IoReaderWriter) String

                                                                                                      func (r *IoReaderWriter) String() string

                                                                                                      type IoWriter

                                                                                                      type IoWriter struct {
                                                                                                      	Writer     io.Writer
                                                                                                      	AddNewline bool
                                                                                                      }

                                                                                                        IoWriter wraps any io.Writer object. It can be used to write data out to a File, os.Stdout, or any other task that can be supported via io.Writer.

                                                                                                        func NewIoWriter

                                                                                                        func NewIoWriter(writer io.Writer) *IoWriter

                                                                                                          NewIoWriter returns a new IoWriter wrapping the given io.Writer object

                                                                                                          func (*IoWriter) Finish

                                                                                                          func (w *IoWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                            Finish - see interface for documentation.

                                                                                                            func (*IoWriter) ProcessData

                                                                                                            func (w *IoWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                              ProcessData writes the data

                                                                                                              func (*IoWriter) String

                                                                                                              func (w *IoWriter) String() string

                                                                                                              type Passthrough

                                                                                                              type Passthrough struct {
                                                                                                              	// contains filtered or unexported fields
                                                                                                              }

                                                                                                                Passthrough simply passes the data on to the next stage. We have to set a placeholder field - if we leave this as an empty struct we get some properties for comparison and memory addressing that are not desirable and cause comparison bugs (see: http://dave.cheney.net/2014/03/25/the-empty-struct)

                                                                                                                func NewPassthrough

                                                                                                                func NewPassthrough() *Passthrough

                                                                                                                  NewPassthrough instantiates a new instance of Passthrough

                                                                                                                  func (*Passthrough) Finish

                                                                                                                  func (r *Passthrough) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                    Finish - see interface for documentation.

                                                                                                                    func (*Passthrough) ProcessData

                                                                                                                    func (r *Passthrough) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                      ProcessData blindly sends whatever it receives to the outputChan

                                                                                                                      func (*Passthrough) String

                                                                                                                      func (r *Passthrough) String() string

                                                                                                                      type RegexpMatcher

                                                                                                                      type RegexpMatcher struct {
                                                                                                                      
                                                                                                                      	// Set to true to log each match attempt (logger must be in debug mode).
                                                                                                                      	DebugLog bool
                                                                                                                      	// contains filtered or unexported fields
                                                                                                                      }

                                                                                                                        RegexpMatcher checks if incoming data matches the given Regexp, and sends it on to the next stage only if it matches. It is using regexp.Match under the covers: https://golang.org/pkg/regexp/#Match

                                                                                                                        func NewRegexpMatcher

                                                                                                                        func NewRegexpMatcher(pattern string) *RegexpMatcher

                                                                                                                          NewRegexpMatcher returns a new RegexpMatcher initialized with the given pattern to match.

                                                                                                                          func (*RegexpMatcher) Finish

                                                                                                                          func (r *RegexpMatcher) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                            Finish - see interface for documentation.

                                                                                                                            func (*RegexpMatcher) ProcessData

                                                                                                                            func (r *RegexpMatcher) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                              ProcessData sends the data it receives to the outputChan only if it matches the supplied regex

                                                                                                                              func (*RegexpMatcher) String

                                                                                                                              func (r *RegexpMatcher) String() string

                                                                                                                              type S3Reader

                                                                                                                              type S3Reader struct {
                                                                                                                              	IoReader // embeds IoReader
                                                                                                                              
                                                                                                                              	DeleteObjects bool
                                                                                                                              	// contains filtered or unexported fields
                                                                                                                              }

                                                                                                                                S3Reader handles retrieving objects from S3. Use NewS3ObjectReader to read a single object, or NewS3PrefixReader to read all objects matching the same prefix in your bucket. S3Reader embeds an IoReeader, so it will support the same configuration options as IoReader.

                                                                                                                                func NewS3ObjectReader

                                                                                                                                func NewS3ObjectReader(awsID, awsSecret, awsRegion, bucket, object string) *S3Reader

                                                                                                                                  NewS3ObjectReader reads a single object from the given S3 bucket

                                                                                                                                  func NewS3PrefixReader

                                                                                                                                  func NewS3PrefixReader(awsID, awsSecret, awsRegion, bucket, prefix string) *S3Reader

                                                                                                                                    NewS3PrefixReader reads a all objects from the given S3 bucket that match a prefix. See http://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysHierarchy.html S3 Delimiter will be "/"

                                                                                                                                    func (*S3Reader) Finish

                                                                                                                                    func (r *S3Reader) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                      Finish - see interface for documentation.

                                                                                                                                      func (*S3Reader) ProcessData

                                                                                                                                      func (r *S3Reader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                        ProcessData reads an entire directory if a prefix is provided (sending each file in that directory to outputChan), or just sends the single file to outputChan if a complete file path is provided (not a prefix/directory).

                                                                                                                                        It optionally deletes all processed objects once the contents have been sent to outputChan

                                                                                                                                        func (*S3Reader) String

                                                                                                                                        func (r *S3Reader) String() string

                                                                                                                                        type S3Writer

                                                                                                                                        type S3Writer struct {
                                                                                                                                        	Compress      bool
                                                                                                                                        	LineSeparator string
                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                        }

                                                                                                                                          S3Writer sends data upstream to S3. By default, we will not compress data before sending it. Set the `Compress` flag to true to use gzip compression before storing in S3 (if this flag is set to true, ".gz" will automatically be appended to the key name specified).

                                                                                                                                          By default, we will separate each iteration of data sent to `ProcessData` with a new line when we piece back together to send to S3. Change the `LineSeparator` attribute to change this behavior.

                                                                                                                                          func NewS3Writer

                                                                                                                                          func NewS3Writer(awsID, awsSecret, awsRegion, bucket, key string) *S3Writer

                                                                                                                                            NewS3Writer instaniates a new S3Writer

                                                                                                                                            func (*S3Writer) Finish

                                                                                                                                            func (w *S3Writer) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                              Finish writes all enqueued data to S3, defering to util.WriteS3Object

                                                                                                                                              func (*S3Writer) ProcessData

                                                                                                                                              func (w *S3Writer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                ProcessData enqueues all received data

                                                                                                                                                func (*S3Writer) String

                                                                                                                                                func (w *S3Writer) String() string

                                                                                                                                                type SCP

                                                                                                                                                type SCP struct {
                                                                                                                                                	Port        string // e.g., "2222" -- only send for non-standard ports
                                                                                                                                                	Object      string // e.g., "/path/to/file.txt"
                                                                                                                                                	Destination string // e.g., "user@host:/path/to/destination/"
                                                                                                                                                
                                                                                                                                                }

                                                                                                                                                  SCP executes the scp command, sending the given file to the given destination.

                                                                                                                                                  func NewSCP

                                                                                                                                                  func NewSCP(obj string, destination string) *SCP

                                                                                                                                                    NewSCP instantiates a new instance of SCP

                                                                                                                                                    func (*SCP) Finish

                                                                                                                                                    func (s *SCP) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                      Finish defers to Run

                                                                                                                                                      func (*SCP) ProcessData

                                                                                                                                                      func (s *SCP) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                        ProcessData sends all data to outputChan

                                                                                                                                                        func (*SCP) Run

                                                                                                                                                        func (s *SCP) Run(killChan chan error)

                                                                                                                                                          Run executes the scp command from the attributes of the SCP struct

                                                                                                                                                          type SQLExecutor

                                                                                                                                                          type SQLExecutor struct {
                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                          }

                                                                                                                                                            SQLExecutor runs the given SQL and swallows any returned data.

                                                                                                                                                            It can operate in 2 modes: 1) Static - runs the given SQL query and ignores any received data. 2) Dynamic - generates a SQL query for each data payload it receives.

                                                                                                                                                            The dynamic SQL generation is implemented by passing in a "sqlGenerator" function to NewDynamicSQLExecutor. This allows you to write whatever code is needed to generate SQL based upon data flowing through the pipeline.

                                                                                                                                                            func NewDynamicSQLExecutor

                                                                                                                                                            func NewDynamicSQLExecutor(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLExecutor

                                                                                                                                                              NewDynamicSQLExecutor returns a new SQLExecutor operating in dynamic mode.

                                                                                                                                                              func NewSQLExecutor

                                                                                                                                                              func NewSQLExecutor(dbConn *sql.DB, sql string) *SQLExecutor

                                                                                                                                                                NewSQLExecutor returns a new SQLExecutor

                                                                                                                                                                func (*SQLExecutor) Finish

                                                                                                                                                                func (s *SQLExecutor) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                  Finish - see interface for documentation.

                                                                                                                                                                  func (*SQLExecutor) ProcessData

                                                                                                                                                                  func (s *SQLExecutor) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                    ProcessData runs the SQL statements, deferring to util.ExecuteSQLQuery

                                                                                                                                                                    func (*SQLExecutor) String

                                                                                                                                                                    func (s *SQLExecutor) String() string

                                                                                                                                                                    type SQLReader

                                                                                                                                                                    type SQLReader struct {
                                                                                                                                                                    	BatchSize         int
                                                                                                                                                                    	StructDestination interface{}
                                                                                                                                                                    	ConcurrencyLevel  int // See ConcurrentDataProcessor
                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                    }

                                                                                                                                                                      SQLReader runs the given SQL and passes the resulting data to the next stage of processing.

                                                                                                                                                                      It can operate in 2 modes: 1) Static - runs the given SQL query and ignores any received data. 2) Dynamic - generates a SQL query for each data payload it receives.

                                                                                                                                                                      The dynamic SQL generation is implemented by passing in a "sqlGenerator" function to NewDynamicSQLReader. This allows you to write whatever code is needed to generate SQL based upon data flowing through the pipeline.

                                                                                                                                                                      func NewDynamicSQLReader

                                                                                                                                                                      func NewDynamicSQLReader(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLReader

                                                                                                                                                                        NewDynamicSQLReader returns a new SQLReader operating in dynamic mode.

                                                                                                                                                                        func NewSQLReader

                                                                                                                                                                        func NewSQLReader(dbConn *sql.DB, sql string) *SQLReader

                                                                                                                                                                          NewSQLReader returns a new SQLReader operating in static mode.

                                                                                                                                                                          func (*SQLReader) Concurrency

                                                                                                                                                                          func (s *SQLReader) Concurrency() int

                                                                                                                                                                            Concurrency defers to ConcurrentDataProcessor

                                                                                                                                                                            func (*SQLReader) Finish

                                                                                                                                                                            func (s *SQLReader) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                              Finish - see interface for documentation.

                                                                                                                                                                              func (*SQLReader) ForEachQueryData

                                                                                                                                                                              func (s *SQLReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))

                                                                                                                                                                                ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.

                                                                                                                                                                                func (*SQLReader) ProcessData

                                                                                                                                                                                func (s *SQLReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                  ProcessData - see interface for documentation.

                                                                                                                                                                                  func (*SQLReader) String

                                                                                                                                                                                  func (s *SQLReader) String() string

                                                                                                                                                                                  type SQLReaderWriter

                                                                                                                                                                                  type SQLReaderWriter struct {
                                                                                                                                                                                  	SQLReader
                                                                                                                                                                                  	SQLWriter
                                                                                                                                                                                  	ConcurrencyLevel int // See ConcurrentDataProcessor
                                                                                                                                                                                  }

                                                                                                                                                                                    SQLReaderWriter performs both the job of a SQLReader and SQLWriter. This means it will run a SQL query, write the resulting data into a SQL database, and (if the write was successful) send the queried data to the next stage of processing.

                                                                                                                                                                                    SQLReaderWriter is composed of both a SQLReader and SQLWriter, so it supports all of the same properties and usage options (such as static versus dynamic SQL querying).

                                                                                                                                                                                    func NewDynamicSQLReaderWriter

                                                                                                                                                                                    func NewDynamicSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, sqlGenerator func(data.JSON) (string, error), writeTable string) *SQLReaderWriter

                                                                                                                                                                                      NewDynamicSQLReaderWriter returns a new SQLReaderWriter ready for dynamic querying.

                                                                                                                                                                                      func NewSQLReaderWriter

                                                                                                                                                                                      func NewSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, readQuery, writeTable string) *SQLReaderWriter

                                                                                                                                                                                        NewSQLReaderWriter returns a new SQLReaderWriter ready for static querying.

                                                                                                                                                                                        func (*SQLReaderWriter) Concurrency

                                                                                                                                                                                        func (s *SQLReaderWriter) Concurrency() int

                                                                                                                                                                                          Concurrency defers to ConcurrentDataProcessor

                                                                                                                                                                                          func (*SQLReaderWriter) Finish

                                                                                                                                                                                          func (s *SQLReaderWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                            Finish - see interface for documentation.

                                                                                                                                                                                            func (*SQLReaderWriter) ProcessData

                                                                                                                                                                                            func (s *SQLReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                              ProcessData uses SQLReader methods for processing data - this works via composition

                                                                                                                                                                                              func (*SQLReaderWriter) String

                                                                                                                                                                                              func (s *SQLReaderWriter) String() string

                                                                                                                                                                                              type SQLWriter

                                                                                                                                                                                              type SQLWriter struct {
                                                                                                                                                                                              	TableName        string
                                                                                                                                                                                              	OnDupKeyUpdate   bool
                                                                                                                                                                                              	OnDupKeyFields   []string
                                                                                                                                                                                              	ConcurrencyLevel int // See ConcurrentDataProcessor
                                                                                                                                                                                              	BatchSize        int
                                                                                                                                                                                              	// contains filtered or unexported fields
                                                                                                                                                                                              }

                                                                                                                                                                                                SQLWriter handles INSERTing data.JSON into a specified SQL table. If an error occurs while building or executing the INSERT, the error will be sent to the killChan.

                                                                                                                                                                                                Note that the data.JSON must be a valid JSON object or a slice of valid objects, where the keys are column names and the the values are the SQL values to be inserted into those columns.

                                                                                                                                                                                                For use-cases where a SQLWriter instance needs to write to multiple tables you can pass in SQLWriterData.

                                                                                                                                                                                                func NewSQLWriter

                                                                                                                                                                                                func NewSQLWriter(db *sql.DB, tableName string) *SQLWriter

                                                                                                                                                                                                  NewSQLWriter returns a new SQLWriter

                                                                                                                                                                                                  func (*SQLWriter) Concurrency

                                                                                                                                                                                                  func (s *SQLWriter) Concurrency() int

                                                                                                                                                                                                    Concurrency defers to ConcurrentDataProcessor

                                                                                                                                                                                                    func (*SQLWriter) Finish

                                                                                                                                                                                                    func (s *SQLWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                      Finish - see interface for documentation.

                                                                                                                                                                                                      func (*SQLWriter) ProcessData

                                                                                                                                                                                                      func (s *SQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                        ProcessData defers to util.SQLInsertData

                                                                                                                                                                                                        func (*SQLWriter) String

                                                                                                                                                                                                        func (s *SQLWriter) String() string

                                                                                                                                                                                                        type SQLWriterData

                                                                                                                                                                                                        type SQLWriterData struct {
                                                                                                                                                                                                        	TableName  string      `json:"table_name"`
                                                                                                                                                                                                        	InsertData interface{} `json:"insert_data"`
                                                                                                                                                                                                        }

                                                                                                                                                                                                          SQLWriterData is a custom data structure you can send into a SQLWriter stage if you need to specify TableName on a per-data payload basis. No extra configuration is needed to use SQLWriterData, each data payload received is first checked for this structure before processing.

                                                                                                                                                                                                          type SftpReader

                                                                                                                                                                                                          type SftpReader struct {
                                                                                                                                                                                                          	IoReader // embeds IoReader
                                                                                                                                                                                                          
                                                                                                                                                                                                          	DeleteObjects bool
                                                                                                                                                                                                          	Walk          bool
                                                                                                                                                                                                          	FileNamesOnly bool
                                                                                                                                                                                                          
                                                                                                                                                                                                          	CloseOnFinish bool
                                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                                          }

                                                                                                                                                                                                            SftpReader reads a single object at a given path, or walks through the directory specified by the path (SftpReader.Walk must be set to true).

                                                                                                                                                                                                            To only send full paths (and not file contents), set FileNamesOnly to true. If FileNamesOnly is set to true, DeleteObjects will be ignored.

                                                                                                                                                                                                            func NewSftpReader

                                                                                                                                                                                                            func NewSftpReader(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpReader

                                                                                                                                                                                                              NewSftpReader instantiates a new sftp reader, a connection to the remote server is delayed until data is recv'd by the reader By default, the connection to the remote client will be closed in the Finish() func. Set CloseOnFinish to false to manage the connection manually.

                                                                                                                                                                                                              func NewSftpReaderByClient

                                                                                                                                                                                                              func NewSftpReaderByClient(client *sftp.Client, path string) *SftpReader

                                                                                                                                                                                                                NewSftpReaderByClient instantiates a new sftp reader using an existing connection to the remote server. By default, the connection to the remote client will *not* be closed in the Finish() func. Set CloseOnFinish to true to have this processor clean up the connection when it's done.

                                                                                                                                                                                                                func (*SftpReader) CloseClient

                                                                                                                                                                                                                func (r *SftpReader) CloseClient()

                                                                                                                                                                                                                  CloseClient allows you to manually close the connection to the remote client (as the remote client itself is not exported)

                                                                                                                                                                                                                  func (*SftpReader) Finish

                                                                                                                                                                                                                  func (r *SftpReader) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                                    Finish optionally closes open references to the remote server

                                                                                                                                                                                                                    func (*SftpReader) ProcessData

                                                                                                                                                                                                                    func (r *SftpReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                                      ProcessData optionally walks through the tree to send each object separately, or sends the single object upstream

                                                                                                                                                                                                                      func (*SftpReader) String

                                                                                                                                                                                                                      func (r *SftpReader) String() string

                                                                                                                                                                                                                      type SftpWriter

                                                                                                                                                                                                                      type SftpWriter struct {
                                                                                                                                                                                                                      	CloseOnFinish bool
                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                      }

                                                                                                                                                                                                                        SftpWriter is an inline writer to remote sftp server

                                                                                                                                                                                                                        func NewSftpWriter

                                                                                                                                                                                                                        func NewSftpWriter(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpWriter

                                                                                                                                                                                                                          NewSftpWriter instantiates a new sftp writer, a connection to the remote server is delayed until data is recv'd by the writer By default, the connection to the remote client will be closed in the Finish() func. Set CloseOnFinish to false to manage the connection manually.

                                                                                                                                                                                                                          func NewSftpWriterByFile

                                                                                                                                                                                                                          func NewSftpWriterByFile(file *sftp.File) *SftpWriter

                                                                                                                                                                                                                            NewSftpWriterByFile allows you to manually manage the connection to the remote file object. Use this if you want to write to the same file object across multiple pipelines. By default, the connection to the remote client will *not* be closed in the Finish() func. Set CloseOnFinish to true to have this processor clean up the connection when it's done.

                                                                                                                                                                                                                            func (*SftpWriter) Finish

                                                                                                                                                                                                                            func (w *SftpWriter) Finish(outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                                              Finish optionally closes open references to the remote file and server

                                                                                                                                                                                                                              func (*SftpWriter) ProcessData

                                                                                                                                                                                                                              func (w *SftpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

                                                                                                                                                                                                                                ProcessData writes data as is directly to the output file

                                                                                                                                                                                                                                func (*SftpWriter) String

                                                                                                                                                                                                                                func (w *SftpWriter) String() string