Documentation

Overview

    Package input provides input components

    Index

    Constants

    This section is empty.

    Variables

      All is the list of all baker inputs.

      View Source
      var KCLDesc = baker.InputDesc{
      	Name:   "KCL",
      	New:    NewKCL,
      	Config: &KCLConfig{},
      	Help: "This input fetches records from Kinesis with KCL. It consumes a specified stream, and\n" +
      		"processes all shards in that stream. It never exits.\n" +
      		"Multiple baker instances can consume the same stream, in that case the KCL will take care of\n" +
      		"balancing the shards between workers. Careful (shard stealing is not implemented yet).\n" +
      		"Resharding on the producer side is automatically handled by the KCL that will distribute\n" +
      		"the shards among KCL workers.",
      }

        KCLDesc describes the KCL input.

        View Source
        var KinesisDesc = baker.InputDesc{
        	Name:   "Kinesis",
        	New:    NewKinesis,
        	Config: &KinesisConfig{},
        	Help: "This input fetches log lines from Kinesis. It listens on a specified stream, and\n" +
        		"processes all the shards in that stream. It never exits.\n",
        }
        View Source
        var ListDesc = baker.InputDesc{
        	Name:   "List",
        	New:    NewList,
        	Config: &ListConfig{},
        	Help: "This input fetches logs from a predefined list of local or remote sources. The \"Files\"\n" +
        		"configuration variable is a list of \"file specifiers\". Each \"file specifier\" can be:\n\n" +
        		"  * A local file path on the filesystem: the log file at that path will be processed\n" +
        		"  * A HTTP/HTTPS URL: the log file at that URL will be downloaded and processed\n" +
        		"  * A S3 URL: the log file at that URL that will be downloaded and processed\n" +
        		"  * \"@\" followed by a local path pointing to a file: the file is expected to be a text file\n" +
        		"    and each line will be read and parsed as a \"file specifier\"\n" +
        		"  * \"@\" followed by a HTTP/HTTPS URL: the text file pointed by the URL will be downloaded,\n" +
        		"    and each line will be read and parsed as a \"file specifier\"\n" +
        		"  * \"@\" followed by a S3 URL pointing to a file: the text file pointed by the URL will be\n" +
        		"    downloaded, and each line will be read and parsed as a \"file specifier\"\n" +
        		"  * \"@\" followed by a local path pointing to a directory (must end with a slash): the directory will be recursively\n" +
        		"    walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" +
        		"  * \"@\" followed by a S3 URL pointing to a directory: the directory on S3 will be recursively\n" +
        		"    walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" +
        		"  * \"-\": the contents of a log file will be read from stdin and processed\n" +
        		"  * \"@-\": each line read from stdin will be parsed as a \"file specifier\"\n\n" +
        		"All records produced by this input contain 2 metadata values:\n" +
        		"  * url: the files that originally contained the record\n" +
        		"  * last_modified: the last modification datetime of the above file\n",
        }
        View Source
        var SQSDesc = baker.InputDesc{
        	Name:   "SQS",
        	New:    NewSQS,
        	Config: &SQSConfig{},
        	Help: "This input listens on multiple SQS queues for new incoming log files\n" +
        		"on S3; it is meant to be used with SQS queues popoulated by SNS.\n" +
        		"It never exits.\n",
        }
        View Source
        var TCPDesc = baker.InputDesc{
        	Name:   "TCP",
        	New:    NewTCP,
        	Config: &TCPConfig{},
        	Help: "This input relies on a TCP connection to receive records in the usual format\n" +
        		"Configure it with a host and port that you want to accept connection from.\n" +
        		"By default it listens on port 6000 for any connection\n" +
        		"It never exits.\n",
        }

        Functions

        func NewKCL

        func NewKCL(cfg baker.InputParams) (baker.Input, error)

          NewKCL creates a new KCL.

          func NewKinesis

          func NewKinesis(cfg baker.InputParams) (baker.Input, error)

            NewKinesis creates a Kinesis tail, and immediately do a first connection to get the current shard list.

            func NewList

            func NewList(cfg baker.InputParams) (baker.Input, error)

            func NewSQS

            func NewSQS(cfg baker.InputParams) (baker.Input, error)

            func NewTCP

            func NewTCP(cfg baker.InputParams) (baker.Input, error)

            Types

            type KCL

            type KCL struct {
            	// contains filtered or unexported fields
            }

              KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library).

              func (*KCL) CreateProcessor

              func (k *KCL) CreateProcessor() interfaces.IRecordProcessor

                CreateProcessor implements interfaces.IRecordProcessorFactory.

                func (*KCL) FreeMem

                func (k *KCL) FreeMem(data *baker.Data)

                  FreeMem implements baker.Input

                  func (*KCL) Run

                  func (k *KCL) Run(inch chan<- *baker.Data) error

                    Run implements baker.Input.

                    func (*KCL) Stats

                    func (k *KCL) Stats() baker.InputStats

                      Stats implements baker.Input

                      func (*KCL) Stop

                      func (k *KCL) Stop()

                        Stop implements baker.Input

                        type KCLConfig

                        type KCLConfig struct {
                        	AwsRegion       string        `help:"AWS region to connect to" default:"us-west-2"`
                        	Stream          string        `help:"Name of Kinesis stream" required:"true"`
                        	AppName         string        `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"`
                        	MaxShards       int           `help:"Max shards this Worker can handle at a time" default:"32767"`
                        	ShardSync       time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"`
                        	InitialPosition string        `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"`
                        	// contains filtered or unexported fields
                        }

                          KCLConfig is the configuration for the KCL input.

                          type Kinesis

                          type Kinesis struct {
                          	Cfg  *KinesisConfig
                          	Data chan<- *baker.Data
                          	// contains filtered or unexported fields
                          }

                          func (*Kinesis) FreeMem

                          func (s *Kinesis) FreeMem(data *baker.Data)

                          func (*Kinesis) ProcessRecords

                          func (s *Kinesis) ProcessRecords(shard *kinesis.Shard) error

                          func (*Kinesis) Run

                          func (s *Kinesis) Run(data chan<- *baker.Data) error

                          func (*Kinesis) Stats

                          func (s *Kinesis) Stats() baker.InputStats

                          func (*Kinesis) Stop

                          func (s *Kinesis) Stop()

                          type KinesisConfig

                          type KinesisConfig struct {
                          	AwsRegion string        `help:"AWS region to connect to" default:"us-west-2"`
                          	Stream    string        `help:"Stream name on Kinesis" required:"true"`
                          	IdleTime  time.Duration `help:"Time between polls of each shard" default:"100ms"`
                          }

                          type List

                          type List struct {
                          	Cfg *ListConfig
                          	// contains filtered or unexported fields
                          }

                          func (*List) FreeMem

                          func (s *List) FreeMem(data *baker.Data)

                          func (*List) ProcessDirectory

                          func (s *List) ProcessDirectory(dir string, matchPath *regexp.Regexp) error

                          func (*List) Run

                          func (s *List) Run(inch chan<- *baker.Data) error

                          func (*List) Stats

                          func (s *List) Stats() baker.InputStats

                          func (*List) Stop

                          func (s *List) Stop()

                          type ListConfig

                          type ListConfig struct {
                          	Files     []string `help:"List of log-files, directories and/or list-files to process" default:"[\"-\"]"`
                          	MatchPath string   `help:"regexp to filter files in specified directories" default:".*\\.log\\.gz"`
                          	Region    string   `help:"AWS Region for fetching from S3" default:"us-west-2"`
                          }

                          type SQS

                          type SQS struct {
                          	Cfg            *SQSConfig
                          	FilePathRegexp *regexp.Regexp
                          	// contains filtered or unexported fields
                          }

                          func (*SQS) FreeMem

                          func (s *SQS) FreeMem(data *baker.Data)

                          func (*SQS) Run

                          func (s *SQS) Run(inch chan<- *baker.Data) error

                          func (*SQS) Stats

                          func (s *SQS) Stats() baker.InputStats

                          func (*SQS) Stop

                          func (s *SQS) Stop()

                          type SQSConfig

                          type SQSConfig struct {
                          	AwsRegion      string   `help:"AWS region to connect to" default:"us-west-2"`
                          	Bucket         string   `help:"S3 Bucket to use for processing" default:""`
                          	QueuePrefixes  []string `help:"Prefixes of the names of the SQS queues to monitor" required:"true"`
                          	MessageFormat  string   `` /* 189-byte string literal not displayed */
                          	FilePathFilter string   `help:"If provided, will only use S3 files with the given path."`
                          }

                          type TCP

                          type TCP struct {
                          	Cfg *TCPConfig
                          	// contains filtered or unexported fields
                          }

                          func (*TCP) FreeMem

                          func (s *TCP) FreeMem(data *baker.Data)

                          func (*TCP) Run

                          func (s *TCP) Run(inch chan<- *baker.Data) error

                          func (*TCP) Stats

                          func (s *TCP) Stats() baker.InputStats

                          func (*TCP) Stop

                          func (s *TCP) Stop()

                          type TCPConfig

                          type TCPConfig struct {
                          	Listener string `help:"Host:Port to bind to"`
                          }

                          Directories

                          Path Synopsis