README

Baker

PkgGoDev Go codecov

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.
Baker is fully parallel and maximizes usage of both CPU-bound and I/O bound pipelines.

Pipelines

A pipeline is the configured set of operations that Baker performs during its execution.

It is defined by:

  • One input component, defining where to fetch records from
  • Zero or more filters, which are functions that can process records (reading/writing fields, clearing them or even splitting them into multiple records)
  • One output component, defining where to send the filtered records to (and which columns)
  • One optional upload component, defining where to send files produced by the output component (if any)

Notice that there are two main usage scenarios for Baker:

  1. Baker as a batch processor. In this case, Baker will go through all the records that are fed by the input component, process them as quickly as possible, and exit.
  2. Baker as a daemon. In this case, Baker will never exit; it will keep waiting for incoming records from the input component (e.g.: Kinesis), process them and send them to the output.

Selecting between scenario 1 or scenario 2 is just a matter of configuring the pipeline; in fact, it is the input component that drives the scenario. If the input component exits at some point, Baker will flush the pipeline and exit as well; if instead the input component is endless, Baker will never exit and thus behave like a daemon.

Usage

Baker uses a baker.Config struct to know what to do. The configuration can be created parsing a toml file with baker.NewConfigFromToml(). This function requires a baker.Components object including all available/required components.
This is an example of this struct:

comp := baker.Components{
	Inputs:  input.AllInputs(),
	Filters: MyCustomFilters(),
	// merge available outputs with user custom outputs
	Outputs: append(output.AllOutputs(), MyCustomOutputs()...),
	Uploads: MyCustomUploads(),
	// optional: custom extra config
	User: MyCustomConfigs(),
	// optional: used if sharding is enabled
	ShardingFuncs: MyRecordShardingFuncs,
	// optional: used if records must be validated. Not used if [general.dont_validate_fields] is used in TOML
	Validate: MyRecordValidationFunc,
	// optional: functions to get fields indexes by name and vice-versa
	FieldByName: MyFieldByNameFunc,
	FieldNames:  []string{"field0", "field1", "field2"},
}

Components (inputs, filters, outputs and uploads) are either generic ones provided with Baker or user-defined components, or a mix of both.

Performance

Read from S3 and write to local disk

On a c5.2xlarge instance, Baker managed to read zstandard records from S3, uncompress them and apply a basic filtering logic, compressing them back on local files with zstandard at compression level 3 and long range mode at 27 using ~90% of capacity of each vCPU (8 in total) and ~3.5GB of RAM.
It read and wrote a total of 94 million records in 8'51" (~178k r/w records per second).
On a c5.12xlarge instance (48vCPUs) the same test took 2'2" (~775k r/w records per second).

For this test we used 711 zstd compressed files for a total of 17 GB of compressed size and 374 GB of uncompressed size. The average size of each record was ~4.5 KB.

Read from S3 and write to DynamoDB (in the same region)

On a c5.4xlarge instance, Baker read zstd compressed files from S3 writing to DynamoDB (configured with 20k write capacity units) at an average speed of 60k records/s (the average size of each record is 4.3 KB) using less than 1 GB of memory and ~300% of the total CPU capacity (less than 20% for each core). The bottleneck here was the DynamoDB write capacity, so Baker can easily cope with an increased load just increasing the write capacity units in DynamoDB (up to 400k).

Read from Kinesis and write to DynamoDB (in the same region)

On a c5.4xlarge instance, we performed a test reading from a Kinesis stream with 130 shards and writing to a DynamoDB table with 20k write capacity units. Baker was able to read and write more than 10k records per second (the avg size of each record was 4.5 KB) using less than 1 GB of RAM and ~400% of the total CPU capacity (less than 25% for each core).

Baker and AWS Kinesis Data Firehose

On many aspects Baker can be compared with Firehose and so we did when we used Baker in one of the NextRoll project.
As mentioned in the NextRoll Tech Blog the price of that service, OMFG, if served using Amazon Firehose, would have been around $30k/month (not including S3 and data transfer costs). That monthly cost is more than the whole yearly cost of the service using Baker.

How to build a Baker executable

The examples/ folder contains several main() examples:

  • basic: a simple example with minimal support
  • filtering: shows how to code your own filter
  • sharding: shows how to use an output that supports sharding (see below for details about sharding)
  • help: shows components' help messages
  • metrics: shows how to implement and plug a new metrics client to Baker
  • advanced: an advanced example with most of the features supported by Baker

TOML Configuration files

This is a minimalist Baker pipeline TOML configuration that reads a record from the disk, updates its timestamp field with a "Timestamp" filter and pushes it to DynamoDB:

[input]
name="List"

    [input.config]
    files=["records.csv.gz"]

[[filter]]
name="ClauseFilter"

[output]
name="DynamoDB"
fields=["source","timestamp","user"]


    [output.config]
    regions=["us-west-2","us-east-1"]
    table="TestTableName"
    columns=["s:Source", "n:Timestamp", "s:User"]

[input] selects the input component, or where to read the records from. In this case, the List component is selected, which is a component that fetches CSV files from a list of local or remote paths/URLs. [input.config] is where component-specific configurations can be specified, and in this case we simply provide the files option to List. Notice that List would accept http:// or even s3:// URLs there in addition to local paths, and some more (run ./baker-bin -help List in the help example for more details).

[[filter]] In TOML syntax, the double brackets indicates an array of sections. This is where you declare the list of filters (i.e filter chain) to sequentially apply to your records. As other components, each filter may be followed by a [filter.config] section. This is an example:

[[filter]]
name="filterA"

    [filter.config]
    foo = "bar"

[[filter]]
name="filterB"

[output] selects the output component; the output is where records that made it to the end of the filter chain without being discarded end up. In this case, the DynamoDB output is selected, and its configuration is specified in [output.config].

The fields option in the [output] section selects which fields of the record will be send to the output. In fact, most pipelines don't want to send the full records to the output, but they will select a few important columns out of the many available columns. Notice that this is just a selection: it is up to the output component to decide how to physically serialize those columns. For instance, the DynamoDB component requires the user to specify an option called columns that specifies the name and the type of the column where the fields will be written.

Baker supports environment variables replacement in the configuration file. Use ${ENV_VAR_NAME} or $ENV_VAR_NAME and the value in the file will be replaced at runtime. Note that if the variable doesn't exist, then an empty string will be used for replacement.

How to create components

To register a new component within Baker and make it available for your pipelines, you must create and fill a description structure and provide it to baker.Components. The structure to fill up is either a InputDesc, FilterDesc, OutputDesc, UploadDesc or MetricsDesc, depending on the component type.

At runtime, components configurations (i.e [input.config], [output.config] and so on) are serialized from TOML and each of them forwarded to the component constructor function.

Configuration fields may contains some struct tags. Let's see their use with an example:

type MyConfig struct {
  Name string      `help:"Name is ..." required:"true"`
  Value int        `help:"Value is ..."`
  Strings []string `help:"Strings ..." default:"[a, b, c]"`
}

Supported struct tags:

  • help: shown on the terminal when requesting this component's help
  • default: also shown in the component help
  • required: also shown in help. Configuration fails if the field is not set in TOML (or let to itds zero value).
Filters

An example code can be found at ./examples/filtering/filter.go

A filter must implement a baker.Filter interface:

type Filter interface {
    Process(r Record, next func(Record))
    Stats() FilterStats
}

While Stats() returns a struct used to collect metrics (see the Metrics chapter), the Process() function is where the filter logic is actually implemented.
Filters receive a Record and the next() function, that represents the next filtering function in the filter chain.
The filter can do whatever it likes with the Record, like adding or changing a value, dropping it (not calling the next() function) or even splitting a Record calling next() multiple times.

baker.FilterDesc

In case you plan to use a TOML configuration to build the Baker topology, the filter should also be described using a baker.FilterDesc struct. In fact a list of baker.FilterDesc will be used to populate baker.Components, which is an argument of baker.NewConfigFromToml.

type FilterDesc struct {
    Name   string
    New    func(FilterParams) (Filter, error)
    Config interface{}
    Help   string
}
Name

The Name of the filter must be unique as it will match the toml [filter] configuration:

[[filter]]
name = "FilterName"
    [filter.config]
    filterConfKey1 = "somevalue"
    filterConfKey2 = "someothervalue"
New

This is the constructor and returns the baker.Filter interface as well as a possible error.

Config

The filter can have its own configuration (as the [filter.config] fields above). The Config field will be populated with a pointer to the configuration struct provided.

The New function will receive a baker.FilterParams. Its DecodedConfig will host the filter configuration. It requires a type assertion to the filter configuration struct type to be used:

func NewMyCustomFilter(cfg baker.FilterParams) (baker.Filter, error) {
    if cfg.DecodedConfig == nil {
        cfg.DecodedConfig = &MyCustomFilterConfig{}
    }
    dcfg := cfg.DecodedConfig.(*MyCustomFilterConfig)
}
Help

The help string can be used to build an help output (see the help example).

Inputs

An input is described to baker by filling up a baker.InputDesc struct.
The New function must return a baker.Input component whose Run function represents the hearth of the input.
That function receives a channel where the data produced by the input must be pushed in form of a baker.Data.
The actual input data is a slice of bytes that will be parsed with Record.Parse() by Baker before sending it to the filter chain. The input can also add metadata to baker.Data. Metadata can be user-defined and filters must know how to read and use metadata defined by the input.

Outputs

An output must implement the Output interface:

type Output interface {
    Run(in <-chan OutputRecord, upch chan<- string)
    Stats() OutputStats
    CanShard() bool
}

The nop output is a simple implementation of an output and can be used as source.

The fields configuration of the output (like fields=["field", "field", ...]) tells which record fields will populate the OutputRecord.Fields slice that is sent to the output as its input. The values of the slice will have the same order as the fields configuration.

The procs setting in the [output] TOML defines how many instances of the output are going to be created (i.e the New constructor function is going to be called procs times).
Each instance receives its index over the total in OutputParams.Index, which is passed to the output New function.
Which output instance a record is sent to, depends on the sharding function (see the sharding section).

NOTE: If procs>1 but sharding is not set up, procs is set back to 1.

Sharding (which is explained below) is strictly connected to the output component but it's also transparent to it. An output will never know how the sharding is calculated, but records with the same value on the field used to calculate sharding will be always sent to the output process with the same index (unless a broken sharding function is used).

The output also receives an upload channel where it can send strings to the uploader. Those strings will likely be paths to something produced by the output (like files) that the uploader must upload somewhere.

Raw outputs

An output can declare itself as "raw". A raw output will receive, in addition to optional selected fields, also the whole serialized record as a []byte buffer.
Serializing a record has a cost, that's why each output must choose to receive it and the default is not to serialize the whole record.

Uploads

Outputs can, if applicable, send paths to local files to a chan string. Uploads read from this channel and can do whatever they desire with those path. As an example upload.S3 uploads them to S3..

The uploader component is optional, if missing the string channel is simply ignored by Baker.

How to create a '-help' command line option

The ./examples/help/ folder contains a working example of command that shows a generic help/usage message and also specific component help messages when used with -help <ComponentName>

Provided Baker components

Inputs
KCL

input.KCL fetches records from AWS Kinesis using vmware-go-kcl, an implementation of the KCL (Kinesis Client Library). KCL provides a way to to process a single Kinesis stream from multiple Baker instances, each instance consuming a number of shards.

The KCL takes care of balancing the shards between workers. At the time of writing, vmware-go-kcl doesn't implement shard stealing yet, so it's advised to set MaxShards to a reasonable value. Since the number of shards doesn't change often, dividing the number of total shards by the expected number of baker instances and rounding up to the next integer has given us good results.

The association between shards and baker instances (or workers) are called leases. Lease synchronization is taken care of by KCL; to do so it requires access to a DynamoDB table, which named depends on the configured AppName.

Leases are updated at regular interval defined by ShardSync.

The dynamodb table also serves the purpose of checkpointing, that is keeping track of the per-shard advancement by writing the ID last read record (checkpoint).

InitialPosition defines the initial checkpoint position for consuming new shards. This parameter is only effective the first time a shard ID is encountered, since after that the lease will associate the shard and a record checkpoint. It can either be set to LATEST or TRIM_HORIZON.

Note that when new shards are created in the event of a resharding, KCL may not immediately be aware of their creation. Setting TRIM_HORIZON is thus a safer choice here since eventually all the records from the newly created shards will be consumed, as opposed to LATEST, which can lead to some missing records.

Implementation and throttling prevention

Within a Baker instance, the KCL input creates as many record processors as there are shards to read from. A record processor pulls records by way of the GetRecords AWS Kinesis API call.

AWS imposes limits on GetRecords, each shard can support up to a maximum total data read rate of 2 MiB per second via GetRecords. If a call to GetRecords returns 10 MiB, the maximum size GetRecords is allowed to return, subsequent calls made within the next 5 seconds will meet a ProvisionedThroughputExceededException. Limiting the number of records per call would work but would increase the number of performed IO syscalls and will increase the risk to meet the limits imposed by AWS on API calls or to not process records as fast as possible.

The strategy we're using is to not limit MaxRecords but sleeping for 6s. Doing so, we're guaranteed to never exceed the per-shard read througput limit of 2MB/s, while being close to it on data peaks. This has the added advantage of reducing the number of IO syscalls.

Working with baker.Record

baker.Record is an interface which provides an abstraction over a record of flattened data, where columns of fields are indexed through integers.

At the moment, baker proposes an unique Record implementation, baker.LogLine.

baker.LogLine CSV record

baker.LogLine is an highly optimized CSV compatible Record implementation. It supports any single-byte field separator and doesn't handle quotes (neither single nor double). The maximum number of fields is hard-coded by the LogLineNumFields constant which is 3000. 100 extra fields can be stored at runtime in a LogLine (also hardcoded with NumFieldsBaker), these extra fields are a fast way to exchange data between filters and/or outputs but they are neither handled during Parsing (i.e LogLine.Parse) nor serialization (LogLine.ToText).

If the hardcoded values for LogLineNumFields and NumFieldsBaker do not suit your needs, it's advised that you copy logline.go in your project and modify the constants declared at the top of the file. Your specialized LogLine will still implement baker.Record and thus can be used in lieu of baker.LogLine. To do so, you need to provide a CreateRecord function to baker.Components when calling baker.NewConfigFromToml.

For example:

comp := baker.Components{}

comp.CreateRecord = func() baker.Record {
  return &LogLine{FieldSeparator:','}
}

Tuning parallelism

When testing Baker in staging environment, you may want to experiment with parallelism options to try and obtain the best performance. Keep an eye on htop as the pipeline runs: if CPUs aren't saturated, it means that Baker is running I/O-bound, so depending on the input/output configuration you may want to increase the parallelism and squeeze out more performance.

These are the options you can tune:

  • Section [filterchain]:
    • procs: number of parallel goroutines running the filter chain (default: 16)
  • Section [output]:
    • procs: number of parallel goroutines sending data to the output (default: 32)

Sharding

Baker supports sharding of output data, depending on the value of specific fields in each record. Sharding makes sense only for some specific output components, so check out their documentation. A component that supports sharding must return true from the function CanShard().

To configure sharding, it's sufficient to create a sharding key in [output] section, specifying the column on which the sharding must be executed. For a field to be shardable, a ShardingFunc must exist for that field (see baker.Components.ShardingFuncs).
Since the sharding functions provide the capability to spread the records across different output procs (parallel goroutines), it's clear that the [output] configuration must include a procs value greater than 1 (or must avoid including it as the default value is 32).

How to implement a sharding function

The ./examples/sharding/ folder contains a working example of an output that supports sharding and a main() configuration to use it together with simple sharding functions.

Stats

While running, Baker dumps stats on stdout every second. This is an example line:

Stats: 1s[w:29425 r:29638] total[w:411300 r:454498] speed[w:27420 r:30299] errors[p:0 i:0 f:0 o:0 u:0]

The first bracket shows the number of records that were read (i.e. entered the pipeline) and written (i.e. successfully exited the pipeline) in the last 1-second window. The second bracket is total since the process was launched (the u: key is the number of files successfully uploaded). The third bracket shows the average read/write speed (records per second).

The fourth bracket shows the records that were discarded at some point during the records because of errors:

  • p: is the number of records that were discarded for a parsing error
  • i: is the number of records that were discarded because an error occurred within the input component. Most of the time, this refers to validation issues.
  • f: is the number of records that were discarded by the filters in the pipeline. Each filter can potentially discard records, and if that happens, it will be reported here.
  • o: is the number of records that were discarded because of an error in the output component. Notice that output components should be resilient to transient network failures, and they abort the process in case of permanent configuration errors, so the number here reflects records that could not be permanently written because eg. validation issues. Eg. think of an output that expects a column to be in a specific format, and rejects records where that field is not in the expected format. A real-world example is empty columns that are not accepted by DynamoDB.
  • u: is the number records whose upload has failed

Metrics

During execution, Baker gathers some general metrics from the components present in the topology. More specific metrics can also be generated by components.

Metrics are then exported via an implementation of the baker.MetricsClient interface. At the moment, the only implementation is datadog.Client.

Configuration of the metrics client happens in the baker TOML configuration file:

[metrics]
name="datadog"

    [metrics.config]
    host="localhost:8125"            # host of the dogstatsd client to which send metrics to (in UDP)
    prefix="myapp.baker."            # prefix to prepend to the name of all exported metrics
    send_logs=true                   # whether log messages should be sent (as Dogstatd) events 
    tags=["tag1:foo", "tag2:bar"]    # tags to associate to all exported metrics 

The fields available in the [metrics.config] section depends on the metrics.Client implementation, chosen with name value in the [metrics] parent section.

The metrics examples shows an example implementation of baker.MetricsClient and how to plug it to Baker so that it can be chosen in the [metrics] TOML section and used to export Baker metrics.

Aborting (CTRL+C)

By design, Baker attempts a clean shutdown on CTRL+C (SIGINT). This means that it will try to flush all records that it's currently processing and correctly flush/close any output.

Depending on the configured pipeline and the requested parallelism, this could take anything from a few seconds to a minute or so (modulo bugs...). Notice that improving this requires active development, so feel free to open an issue if it's not working fast enough fo you.

If you need to abort right away, you can use CTRL+\ (SIGQUIT).

Baker test suite

Run baker test suite with: go test -v -race ./...
The code also includes several benchmarks.

Package structure

./pkg contains reusable packages providing various utilities that are not specifically Baker-related, though of course they may come handy while developping new baker components.

Documentation

Overview

    Package baker provides types and functions to build a pipeline for the processing of structured data.

    Structured data is represented by the Record interface. LogLine implements that interface and represents a csv record.

    Using the functions in the package one can build and run a Topology, reading its configuration from a TOML file.

    The package doesn't include any component. They can be found in their respective packages (baker/input, baker/filter, baker/output and baker/upload).

    The README file in the project repository provides additional information and examples: https://github.com/AdRoll/baker/blob/main/README.md

    Index

    Constants

    View Source
    const (
    	// LogLineNumFields is the maximum number of standard fields in a log line.
    	LogLineNumFields FieldIndex = 3000
    	// NumFieldsBaker is an additional list of custom fields, not present
    	// in the input logline nor in the output, that can be set during processing.
    	// Its main purpose it to fastly exchange values between filters (and possibly
    	// outputs) on a per-record basis.
    	NumFieldsBaker FieldIndex = 100
    
    	// DefaultLogLineFieldSeparator defines the default field separator, which is the comma
    	DefaultLogLineFieldSeparator byte = 44
    )

    Variables

    This section is empty.

    Functions

    func CheckRequiredFields

    func CheckRequiredFields(cfg interface{}) string

      CheckRequiredFields checks that all fields that are tagged as required in cfg's type have actually been set to a value other than the field type zero value. If not CheckRequiredFields returns the name of the first required field that is not set, or, it returns an empty string if all required fields are set of the struct doesn't have any required fields (or any fields at all).

      CheckRequiredFields doesn't support struct embedding other structs.

      func GenerateMarkdownHelp

      func GenerateMarkdownHelp(w io.Writer, desc interface{}) error

        GenerateMarkdownHelp generates markdown-formatted textual help for a Baker component from its description structure. Markdown is written into w.

        func GenerateTextHelp

        func GenerateTextHelp(w io.Writer, desc interface{}) error

          GenerateTextHelp generates non-formatted textual help for a Baker component from its description structure, into w.

          func Main

          func Main(cfg *Config) error

            Main runs the topology corresponding to the provided configuration. Depending on the input, it either blocks forever (daemon) or terminates when all the records have been processed (batch).

            func MainCLI

            func MainCLI(components Components) error

              MainCLI provides a handy way to quickly create a command-line interface to Baker by providing the list of components available to build and run a topology.

              The function includes many utilities that can be configured by command line arguments:

              -help: Prints available options and components
              -v: verbose logging (not compatible with -q)
              -q: quiet logging (not compatible with -v)
              -pretty: logs in textual format instead of JSON format
              -pprof: run a pprof server on the provided host:port address
              

              The function also expects the first non-positional argument to represent the path to the Baker Topology file

              func PrintHelp

              func PrintHelp(w io.Writer, name string, comp Components, format HelpFormat) error

                PrintHelp prints the help message for the given component, identified by its name. When name is '*' it shows the help messages for all components.

                The help message includes the component's description as well as the help messages for all component's configuration keys.

                An example of usage is:

                var flagPrintHelp = flag.String("help", "", "show help for a `component` ('*' for all)")
                flag.Parse()
                comp := baker.Components{ /* add all your baker components here */ }
                PrintHelp(os.Stderr, *flagPrintHelp, comp)
                

                Help output example:

                $ ./baker-bin -help TCP
                
                =============================================
                Input: TCP
                =============================================
                This input relies on a TCP connection to receive records in the usual format
                Configure it with a host and port that you want to accept connection from.
                By default it listens on port 6000 for any connection
                It never exits.
                
                Keys available in the [input.config] section:
                
                Name               | Type               | Default                    | Help
                ----------------------------------------------------------------------------------------------------
                Listener           | string             |                            | Host:Port to bind to
                ----------------------------------------------------------------------------------------------------
                

                func RecordConformanceTest

                func RecordConformanceTest(t *testing.T, create func() Record)

                  RecordConformanceTest is a test helper that verifies the conformance of Record implementation with a set of requirements.

                  func RenderHelpMarkdown

                  func RenderHelpMarkdown(w io.Writer, name string, comp Components) error

                    RenderHelpMarkdown prints markdown formatted help for a single component or for all of them (with name = '*'), and renders it so that it can be printed on a terminal.

                    func RequiredFields

                    func RequiredFields(cfg interface{}) []string

                      RequiredFields returns the names of the underlying configuration structure fields which are tagged as required. To tag a field as being required, a "required" struct struct tag must be present and set to true.

                      RequiredFields doesn't support struct embedding other structs.

                      Types

                      type Cache

                      type Cache map[string]interface{}

                        Cache is a per-record cache.

                        func (*Cache) Clear

                        func (c *Cache) Clear()

                          Clear clears all the entries in the cache.

                          func (*Cache) Del

                          func (c *Cache) Del(key string)

                            Del removes the given cache entry.

                            func (*Cache) Get

                            func (c *Cache) Get(key string) (val interface{}, ok bool)

                              Get fetches the value with the given key. If the key is not present in the cache Get returns (nil, false).

                              func (*Cache) Set

                              func (c *Cache) Set(key string, val interface{})

                                Set assigns the given value to a specific key.

                                type ComponentParams

                                type ComponentParams struct {
                                	DecodedConfig  interface{}                     // decoded component-specific struct (from configuration file)
                                	CreateRecord   func() Record                   // factory function to create new empty records
                                	FieldByName    func(string) (FieldIndex, bool) // translates field names to Record indexes
                                	FieldNames     []string                        // FieldNames holds field names, indexed by their FieldIndex
                                	ValidateRecord ValidationFunc                  // function to validate a record
                                	Metrics        MetricsClient                   // Metrics allows components to add code instrumentation and have metrics exported to the configured backend, if any?
                                }

                                  ComponentParams holds the common configuration parameters passed to components of all kinds.

                                  type Components

                                  type Components struct {
                                  	Inputs  []InputDesc  // Inputs represents the list of available inputs
                                  	Filters []FilterDesc // Filters represents the list of available filters
                                  	Outputs []OutputDesc // Outputs represents the list of available outputs
                                  	Uploads []UploadDesc // Uploads represents the list of available uploads
                                  
                                  	Metrics []MetricsDesc // Metrics represents the list of available metrics clients
                                  	User    []UserDesc    // User represents the list of user-defined configurations
                                  
                                  	ShardingFuncs map[FieldIndex]ShardingFunc // ShardingFuncs are functions to calculate sharding based on field index
                                  	Validate      ValidationFunc              // Validate is the function used to validate a Record
                                  	CreateRecord  func() Record               // CreateRecord creates a new record
                                  
                                  	FieldByName func(string) (FieldIndex, bool) // FieldByName gets a field index by its name
                                  	FieldNames  []string                        // FieldNames holds field names, indexed by their FieldIndex
                                  }

                                    Components holds the descriptions of all components one can use to build a topology.

                                    type Config

                                    type Config struct {
                                    	Input       ConfigInput
                                    	FilterChain ConfigFilterChain
                                    	Filter      []ConfigFilter
                                    	Output      ConfigOutput
                                    	Upload      ConfigUpload
                                    
                                    	General    ConfigGeneral
                                    	Fields     ConfigFields
                                    	Validation ConfigValidation
                                    	Metrics    ConfigMetrics
                                    	CSV        ConfigCSV
                                    	User       []ConfigUser
                                    	// contains filtered or unexported fields
                                    }

                                      A Config specifies the configuration for a topology.

                                      func NewConfigFromToml

                                      func NewConfigFromToml(f io.Reader, comp Components) (*Config, error)

                                        NewConfigFromToml creates a Config from a reader reading from a TOML configuration. comp describes all the existing components.

                                        func (*Config) String

                                        func (c *Config) String() string

                                          String returns a string representation of the exported fields of c.

                                          type ConfigCSV

                                          type ConfigCSV struct {
                                          	// FieldSeparator defines the fields separator used in the records
                                          	FieldSeparator string `toml:"field_separator"`
                                          }

                                            ConfigCSV defines configuration for CSV records

                                            type ConfigFields

                                            type ConfigFields struct {
                                            	Names []string
                                            }

                                              ConfigFields specifies names for records fields. In addition of being a list of names, the position of each name in the slice also indicates the FieldIndex for that name. In other words, if Names[0] = "address", then a FieldIndex of 0 is that field, and "address" is the name of that field.

                                              type ConfigFilter

                                              type ConfigFilter struct {
                                              	Name          string
                                              	DecodedConfig interface{}
                                              
                                              	Config *toml.Primitive
                                              	// contains filtered or unexported fields
                                              }

                                                ConfigFilter specifies the configuration for a single filter component.

                                                type ConfigFilterChain

                                                type ConfigFilterChain struct {
                                                	// Procs specifies the number of baker filters running concurrently.
                                                	// When set to a value greater than 1, filtering may be faster but
                                                	// record ordering is not guaranteed anymore.
                                                	// The default value is 16
                                                	Procs int
                                                }

                                                  ConfigFilterChain specifies the configuration for the whole fitler chain.

                                                  type ConfigGeneral

                                                  type ConfigGeneral struct {
                                                  	// DontValidateFields reports whether records validation is skipped (by not calling Components.Validate)
                                                  	DontValidateFields bool `toml:"dont_validate_fields"`
                                                  }

                                                    A ConfigGeneral specifies general configuration for the whole topology.

                                                    type ConfigInput

                                                    type ConfigInput struct {
                                                    	Name          string
                                                    	ChanSize      int // ChanSize represents the size of the channel to send records from the input to the filters, the default value is 1024
                                                    	DecodedConfig interface{}
                                                    
                                                    	Config *toml.Primitive
                                                    	// contains filtered or unexported fields
                                                    }

                                                      ConfigInput specifies the configuration for the input component.

                                                      type ConfigMetrics

                                                      type ConfigMetrics struct {
                                                      	Name          string
                                                      	DecodedConfig interface{}
                                                      
                                                      	Config *toml.Primitive
                                                      	// contains filtered or unexported fields
                                                      }

                                                        ConfigMetrics holds metrics configuration.

                                                        type ConfigOutput

                                                        type ConfigOutput struct {
                                                        	Name string
                                                        	// Procs defines the number of baker outputs running concurrently.
                                                        	// Only set Procs to a value greater than 1 if the output is concurrent safe.
                                                        	Procs         int
                                                        	ChanSize      int      // ChanSize represents the size of the channel to send records to the ouput component(s), the default value is 16384
                                                        	Sharding      string   // Sharding is the name of the field used for sharding
                                                        	Fields        []string // Fields holds the name of the record fields the output receives
                                                        	DecodedConfig interface{}
                                                        
                                                        	Config *toml.Primitive
                                                        	// contains filtered or unexported fields
                                                        }

                                                          ConfigOutput specifies the configuration for the output component.

                                                          type ConfigUpload

                                                          type ConfigUpload struct {
                                                          	Name          string
                                                          	DecodedConfig interface{}
                                                          
                                                          	Config *toml.Primitive
                                                          	// contains filtered or unexported fields
                                                          }

                                                            ConfigUpload specifies the configuration for the upload component.

                                                            type ConfigUser

                                                            type ConfigUser struct {
                                                            	Name   string
                                                            	Config *toml.Primitive
                                                            }

                                                              A ConfigUser defines a user-specific configuration entry.

                                                              type ConfigValidation

                                                              type ConfigValidation map[string]string

                                                                ConfigValidation specifies regular expressions for field names, used to generate the Record validation function.

                                                                type Data

                                                                type Data struct {
                                                                	Bytes []byte   // Bytes is the slice of raw bytes read by an input
                                                                	Meta  Metadata // Meta is filled by the input and holds metadata that will be associated to the records parsed from Bytes
                                                                }

                                                                  Data represents raw data consumed by a baker input, possibly containing multiple records before they're parsed.

                                                                  type ErrorRequiredField

                                                                  type ErrorRequiredField struct {
                                                                  	Field string // Field is the name of the missing field
                                                                  }

                                                                    ErrorRequiredField describes the absence of a required field in a component configuration.

                                                                    func (ErrorRequiredField) Error

                                                                    func (e ErrorRequiredField) Error() string

                                                                    type FieldIndex

                                                                    type FieldIndex int

                                                                      FieldIndex is the index uniquely representing of a field in a Record.

                                                                      type Filter

                                                                      type Filter interface {
                                                                      	// Process processes a single Record, and then optionally sends it to
                                                                      	// next filter in the chain.
                                                                      	// Process might mutate the Record, adding/modifying/removing fields,
                                                                      	// and might decide to throw it away, or pass it to next filter in chain
                                                                      	// by calling the next() function. In some cases, a filter might generate
                                                                      	// multiple Record in output, by calling next() multiple times.
                                                                      	// next() is guaranteed to be non-nil; for the last filter of the chain,
                                                                      	// it points to a function that wraps up the filtering chain and sends
                                                                      	// the Record to the output.
                                                                      	Process(l Record, next func(Record))
                                                                      
                                                                      	// Stats returns stats about the filter
                                                                      	Stats() FilterStats
                                                                      }

                                                                        Filter represents a data filter; a filter is a function that processes records. A filter can discard, transform, forward and even create records.

                                                                        type FilterDesc

                                                                        type FilterDesc struct {
                                                                        	Name   string                             // Name of the filter
                                                                        	New    func(FilterParams) (Filter, error) // New is the constructor-like function called by the topology to create a new filter
                                                                        	Config interface{}                        // Config is the component configuration
                                                                        	Help   string                             // Help string
                                                                        }

                                                                          FilterDesc describes a Filter component to the topology.

                                                                          type FilterParams

                                                                          type FilterParams struct {
                                                                          	ComponentParams
                                                                          }

                                                                            FilterParams holds the parameters passed to Filter constructor.

                                                                            type FilterStats

                                                                            type FilterStats struct {
                                                                            	NumProcessedLines int64
                                                                            	NumFilteredLines  int64
                                                                            	Metrics           MetricsBag
                                                                            }

                                                                              FilterStats contains statistics about the filter components, ready for export to the metric client and to print debug info

                                                                              type HelpFormat

                                                                              type HelpFormat int

                                                                                HelpFormat represents the possible formats for baker help.

                                                                                const (
                                                                                	// HelpFormatRaw is for raw-formatted help.
                                                                                	HelpFormatRaw HelpFormat = iota
                                                                                
                                                                                	// HelpFormatMarkdown is for markdown formatted help.
                                                                                	HelpFormatMarkdown
                                                                                )

                                                                                type Input

                                                                                type Input interface {
                                                                                	// Start fetching data and pushing it into the channel.
                                                                                	// If this call blocks forever, the topology is permanent and
                                                                                	// acts like a long-running daemon; if this calls exits after
                                                                                	// it has finished, the topology is meant to be run as a task
                                                                                	// to process a fixed-size input, and baker will cleanly shutdown
                                                                                	// after all inputs have been fully processed.
                                                                                	Run(output chan<- *Data) error
                                                                                
                                                                                	// Force the input to stop as clean as possible, at a good boundary.
                                                                                	// This is usually issued at the user's request of exiting the process.
                                                                                	// For instance, it might make sense to finish processing the current
                                                                                	// batch of data or the current file, and then save in stable storage
                                                                                	// the checkpoint to resume it later.
                                                                                	Stop()
                                                                                
                                                                                	// Return stats about the input
                                                                                	Stats() InputStats
                                                                                
                                                                                	// This function is called when the filter is finished with
                                                                                	// the memory received through the input channel. Since the
                                                                                	// memory was allocated by Input, it is returned to it
                                                                                	// so that it might be recycled.
                                                                                	FreeMem(data *Data)
                                                                                }

                                                                                  Input is an interface representing an object that produces (fetches) datas for the filter.

                                                                                  type InputDesc

                                                                                  type InputDesc struct {
                                                                                  	Name   string                           // Name of the input
                                                                                  	New    func(InputParams) (Input, error) // New is the constructor-like function called by the topology to create a new input
                                                                                  	Config interface{}                      // Config is the component configuration
                                                                                  	Help   string                           // Help string
                                                                                  }

                                                                                    InputDesc describes an Input component to the topology.

                                                                                    type InputParams

                                                                                    type InputParams struct {
                                                                                    	ComponentParams
                                                                                    }

                                                                                      InputParams holds the parameters passed to Input constructor.

                                                                                      type InputStats

                                                                                      type InputStats struct {
                                                                                      	NumProcessedLines int64
                                                                                      	CustomStats       map[string]string
                                                                                      	Metrics           MetricsBag
                                                                                      }

                                                                                        InputStats contains statistics about the input component, ready for export to the metric client and to print debug info.

                                                                                        type LogLine

                                                                                        type LogLine struct {
                                                                                        
                                                                                        	// FieldSeparator is the byte used to separate fields value.
                                                                                        	FieldSeparator byte
                                                                                        	// contains filtered or unexported fields
                                                                                        }

                                                                                          LogLine represents a CSV text line using ASCII 30 as field separator. It implement Record..

                                                                                          In memory, it is kept in a format optimized for very fast parsing and low memory-consumption. The vast majority of fields are never accessed during the lifetime of an object, as a filter usually reads or writes just a handful of fields; it thus makes sense to do the quickest possible initial parsing, deferring as much as possible to when a field is actually accessed.

                                                                                          It is also possible to modify a LogLine in memory, as it gets processed. Modifications can be done through the Set() method, and can be done to any field, both those that had a parsed value, and those that were empty.

                                                                                          func (*LogLine) Cache

                                                                                          func (l *LogLine) Cache() *Cache

                                                                                            Cache returns the cache that is local to the current log line.

                                                                                            func (*LogLine) Clear

                                                                                            func (l *LogLine) Clear()

                                                                                              Clear clears the logline

                                                                                              func (*LogLine) Copy

                                                                                              func (l *LogLine) Copy() Record

                                                                                                Copy creates and returns a copy of the current log line.

                                                                                                func (*LogLine) Get

                                                                                                func (l *LogLine) Get(f FieldIndex) []byte

                                                                                                  Get the value of a field (either standard or custom)

                                                                                                  func (*LogLine) Meta

                                                                                                  func (l *LogLine) Meta(key string) (interface{}, bool)

                                                                                                    Meta returns the metadata having the given specific key, if any.

                                                                                                    func (*LogLine) Parse

                                                                                                    func (l *LogLine) Parse(text []byte, meta Metadata) error

                                                                                                      Parse finds the next newline in data and parse log line fields from it into the current LogLine.

                                                                                                      This is the moral equivalent of bytes.Split(), but without memory allocations.

                                                                                                      NOTE: this function is meant to be called onto a just-constructed LogLine instance. For performance reasons, it doesn't reset all the writable fields of the line. If you want to use Parse over an already parsed LogLine, use Clear before.

                                                                                                      func (*LogLine) Set

                                                                                                      func (l *LogLine) Set(f FieldIndex, data []byte)

                                                                                                        Set changes the value of a field (either standard or custom) to a new value

                                                                                                        func (*LogLine) ToText

                                                                                                        func (l *LogLine) ToText(buf []byte) []byte

                                                                                                          ToText converts back the LogLine to textual format and appends it to the specified buffer. If called on a default constructed LogLine (zero-value), ToText returns nil, which is an useless but syntactically valid buffer.

                                                                                                          type Metadata

                                                                                                          type Metadata map[string]interface{}

                                                                                                            Metadata about the input data; each Input will directly populate this map as appropriate. Consumers (filters) will access via Get()

                                                                                                            type MetricsBag

                                                                                                            type MetricsBag map[string]interface{}

                                                                                                              A MetricsBag is collection of metrics, those metrics are reported by every Baker components, through their Stats method. Stats() is called once per second, and contains a MetricsBag filled with values relative to that last second.

                                                                                                              func (MetricsBag) AddDeltaCounter

                                                                                                              func (bag MetricsBag) AddDeltaCounter(name string, delta int64)

                                                                                                                AddDeltaCounter adds a metric of type counter to the bag and increments its value. A counter is a cumulative metric that can only increase. To be meaningful, delta must be positive.

                                                                                                                func (MetricsBag) AddGauge

                                                                                                                func (bag MetricsBag) AddGauge(name string, value float64)

                                                                                                                  AddGauge adds a metric of type gauge to the bag and updates its value. A gauge represents a single numerical data point that can arbitrarily go up and down.

                                                                                                                  func (MetricsBag) AddHistogram

                                                                                                                  func (bag MetricsBag) AddHistogram(name string, values []float64)

                                                                                                                    AddHistogram adds a metrics of type histogram to the bag and tracks its value. A histogram samples observations and counts them in different 'buckets' in order to track and show the statistical distribution of a set of values.

                                                                                                                    func (MetricsBag) AddRawCounter

                                                                                                                    func (bag MetricsBag) AddRawCounter(name string, value int64)

                                                                                                                      AddRawCounter adds a metric of type counter to the bag and sets its current value. A counter is a cumulative metric that can only increase. To be meaningful, value must be positive.

                                                                                                                      func (MetricsBag) AddTimings

                                                                                                                      func (bag MetricsBag) AddTimings(name string, values []time.Duration)

                                                                                                                        AddTimings adds a metric of type histogram to the bag and tracks its value. Timing is basically a histogram but allows to sample values of type time.Duration. A histogram samples observations and counts them in different 'buckets'.

                                                                                                                        func (MetricsBag) Merge

                                                                                                                        func (bag MetricsBag) Merge(other MetricsBag)

                                                                                                                          Merge merges metrics with the same name and types from another MetricsBag into this bag. Counters are summed up, gauges are averaged, and histograms are concatenated.

                                                                                                                          type MetricsClient

                                                                                                                          type MetricsClient interface {
                                                                                                                          
                                                                                                                          	// Gauge sets the value of a metric of type gauge. A Gauge represents a
                                                                                                                          	// single numerical data point that can arbitrarily go up and down.
                                                                                                                          	Gauge(name string, value float64)
                                                                                                                          
                                                                                                                          	// GaugeWithTags sets the value of a metric of type gauge and associates
                                                                                                                          	// that value with a set of tags.
                                                                                                                          	GaugeWithTags(name string, value float64, tags []string)
                                                                                                                          
                                                                                                                          	// RawCount sets the value of a metric of type counter. A counter is a
                                                                                                                          	// cumulative metrics that can only increase. RawCount sets the current
                                                                                                                          	// value of the counter.
                                                                                                                          	RawCount(name string, value int64)
                                                                                                                          
                                                                                                                          	// RawCountWithTags sets the value of a metric or type counter and associates
                                                                                                                          	// that value with a set of tags.
                                                                                                                          	RawCountWithTags(name string, value int64, tags []string)
                                                                                                                          
                                                                                                                          	// DeltaCount increments the value of a metric of type counter by delta.
                                                                                                                          	// delta must be positive.
                                                                                                                          	DeltaCount(name string, delta int64)
                                                                                                                          
                                                                                                                          	// DeltaCountWithTags increments the value of a metric or type counter and
                                                                                                                          	// associates that value with a set of tags.
                                                                                                                          	DeltaCountWithTags(name string, delta int64, tags []string)
                                                                                                                          
                                                                                                                          	// Histogram adds a sample to a metric of type histogram. A histogram
                                                                                                                          	// samples observations and counts them in different 'buckets' in order
                                                                                                                          	// to track and show the statistical distribution of a set of values.
                                                                                                                          	Histogram(name string, value float64)
                                                                                                                          
                                                                                                                          	// HistogramWithTags adds a sample to an histogram and associates that
                                                                                                                          	// sample with a set of tags.
                                                                                                                          	HistogramWithTags(name string, value float64, tags []string)
                                                                                                                          
                                                                                                                          	// Duration adds a duration to a metric of type histogram. A histogram
                                                                                                                          	// samples observations and counts them in different 'buckets'. Duration
                                                                                                                          	// is basically an histogram but allows to sample values of type time.Duration.
                                                                                                                          	Duration(name string, value time.Duration)
                                                                                                                          
                                                                                                                          	// DurationWithTags adds a duration to an histogram and associates that
                                                                                                                          	// duration with a set of tags.
                                                                                                                          	DurationWithTags(name string, value time.Duration, tags []string)
                                                                                                                          }

                                                                                                                            A MetricsClient allows to instrument components code and communicate the metrics to the metrics backend that is configured in Baker.

                                                                                                                            New metrics backends must implement this interface and register their description as a MetricDesc in Components.Metrics. See ./examples/metrics.

                                                                                                                            type MetricsDesc

                                                                                                                            type MetricsDesc struct {
                                                                                                                            	Name   string                                   // Name of the metrics interface
                                                                                                                            	Config interface{}                              // Config is the metrics client specific configuration
                                                                                                                            	New    func(interface{}) (MetricsClient, error) // New is the constructor-like function called by the topology to create a new metrics client
                                                                                                                            }

                                                                                                                              MetricsDesc describes a Metrics interface to the topology.

                                                                                                                              type NopMetrics

                                                                                                                              type NopMetrics struct{}

                                                                                                                                NopMetrics implements a metrics.Client that does nothing.

                                                                                                                                func (NopMetrics) DeltaCount

                                                                                                                                func (NopMetrics) DeltaCount(name string, delta int64)

                                                                                                                                func (NopMetrics) DeltaCountWithTags

                                                                                                                                func (NopMetrics) DeltaCountWithTags(name string, delta int64, tags []string)

                                                                                                                                func (NopMetrics) Duration

                                                                                                                                func (NopMetrics) Duration(name string, value time.Duration)

                                                                                                                                func (NopMetrics) DurationWithTags

                                                                                                                                func (NopMetrics) DurationWithTags(name string, value time.Duration, tags []string)

                                                                                                                                func (NopMetrics) Gauge

                                                                                                                                func (NopMetrics) Gauge(name string, value float64)

                                                                                                                                func (NopMetrics) GaugeWithTags

                                                                                                                                func (NopMetrics) GaugeWithTags(name string, value float64, tags []string)

                                                                                                                                func (NopMetrics) Histogram

                                                                                                                                func (NopMetrics) Histogram(name string, value float64)

                                                                                                                                func (NopMetrics) HistogramWithTags

                                                                                                                                func (NopMetrics) HistogramWithTags(name string, value float64, tags []string)

                                                                                                                                func (NopMetrics) RawCount

                                                                                                                                func (NopMetrics) RawCount(name string, value int64)

                                                                                                                                func (NopMetrics) RawCountWithTags

                                                                                                                                func (NopMetrics) RawCountWithTags(name string, value int64, tags []string)

                                                                                                                                type Output

                                                                                                                                type Output interface {
                                                                                                                                	// Run processes the OutputRecord data coming through a channel.
                                                                                                                                	// Run must block until in channel has been closed and it has processed
                                                                                                                                	// all records.
                                                                                                                                	// It can send filenames via upch, they will be handled by an Upload if one
                                                                                                                                	// is present in the topology.
                                                                                                                                	// TODO: since Run must be blocking, it could return an error, useful
                                                                                                                                	// for the topology to acknowledge the correct processing if nil, or
                                                                                                                                	// end the whole topology in case non-nil.
                                                                                                                                	Run(in <-chan OutputRecord, upch chan<- string) error
                                                                                                                                
                                                                                                                                	// Stats returns stats about the output.
                                                                                                                                	Stats() OutputStats
                                                                                                                                
                                                                                                                                	// CanShards returns true if this output supports sharding.
                                                                                                                                	CanShard() bool
                                                                                                                                }

                                                                                                                                  Output is the final end of a topology, it process the records that have reached the end of the filter chain and performs the final action (storing, sending through the wire, counting, etc.)

                                                                                                                                  type OutputDesc

                                                                                                                                  type OutputDesc struct {
                                                                                                                                  	Name   string                             // Name of the output
                                                                                                                                  	New    func(OutputParams) (Output, error) // New is the constructor-like function called by the topology to create a new output
                                                                                                                                  	Config interface{}                        // Config is the component configuration
                                                                                                                                  	Raw    bool                               // Raw reports whether the output accepts a raw record
                                                                                                                                  	Help   string                             // Help string
                                                                                                                                  }

                                                                                                                                    OutputDesc describes an Output component to the topology.

                                                                                                                                    type OutputParams

                                                                                                                                    type OutputParams struct {
                                                                                                                                    	ComponentParams
                                                                                                                                    	Index  int          // tells the index of the output, in case multiple parallel output procs are used
                                                                                                                                    	Fields []FieldIndex // fields of the record that will be send to the output
                                                                                                                                    }

                                                                                                                                      OutputParams holds the parameters passed to Output constructor.

                                                                                                                                      type OutputRecord

                                                                                                                                      type OutputRecord struct {
                                                                                                                                      	Fields []string // Fields are the fields sent to a Baker output.
                                                                                                                                      	Record []byte   // Record is the data representation of a Record (obtained with Record.ToText())
                                                                                                                                      }

                                                                                                                                        OutputRecord is the data structure sent to baker output components.

                                                                                                                                        It represents a Record in two possibile formats:

                                                                                                                                        * a list of pre-parsed fields, extracted from the record (as string).
                                                                                                                                          This is useful when the output only cares about specific fields and does
                                                                                                                                          not need the full record.
                                                                                                                                        * the whole record, as processed and possibly modified by baker filters (as []byte).
                                                                                                                                        

                                                                                                                                        Fields sent to the output are described in the topology. This was designed such as an output can work in different modes, by processing different fields under the control of the user. Some fields might be required, and this validation should be performed by the Output itself. The topology can also declare no fields in which case, the Fields slice will be empty.

                                                                                                                                        Record is non-nil only if the output declares itself as a raw output (see OutputDesc.Raw). This is done for performance reasons, as recreating the whole record requires allocations and memory copies, and is not always required.

                                                                                                                                        type OutputStats

                                                                                                                                        type OutputStats struct {
                                                                                                                                        	NumProcessedLines int64
                                                                                                                                        	NumErrorLines     int64
                                                                                                                                        	CustomStats       map[string]string
                                                                                                                                        	Metrics           MetricsBag
                                                                                                                                        }

                                                                                                                                          OutputStats contains statistics about the output component, ready for export to the metric client and to print debug info

                                                                                                                                          type Record

                                                                                                                                          type Record interface {
                                                                                                                                          	// Parse decodes a buffer representing a record in its data format into
                                                                                                                                          	// the current record instance.
                                                                                                                                          	//
                                                                                                                                          	// The given Metadata will be attached to that record. Record
                                                                                                                                          	// implementations should also accept a nil in case the record has no
                                                                                                                                          	// Metadata attached.
                                                                                                                                          	Parse([]byte, Metadata) error
                                                                                                                                          
                                                                                                                                          	// ToText returns the reconstructed data format of a record.
                                                                                                                                          	//
                                                                                                                                          	// In case a big enough buf is passed, it will be used to serialize the
                                                                                                                                          	// record.
                                                                                                                                          	ToText(buf []byte) []byte
                                                                                                                                          
                                                                                                                                          	// Copy creates and returns a copy of the current record.
                                                                                                                                          	//
                                                                                                                                          	// The copied record could have been obtained by:
                                                                                                                                          	//  var dst Record
                                                                                                                                          	//  src.Parse(dst.ToText(), nil)
                                                                                                                                          	//
                                                                                                                                          	// but Record implementations should provide a more efficient way.
                                                                                                                                          	Copy() Record
                                                                                                                                          
                                                                                                                                          	// Clear clears the record internal state, making it empty.
                                                                                                                                          	Clear()
                                                                                                                                          
                                                                                                                                          	// Get the value of a field.
                                                                                                                                          	Get(FieldIndex) []byte
                                                                                                                                          
                                                                                                                                          	// Set the value of a field.
                                                                                                                                          	Set(FieldIndex, []byte)
                                                                                                                                          
                                                                                                                                          	// Meta returns the value of the attached metadata for the given key, if any.
                                                                                                                                          	//
                                                                                                                                          	// Records implementers may implement that method by declaring:
                                                                                                                                          	//  type MyRecord struct {
                                                                                                                                          	// 	      meta baker.Metadata
                                                                                                                                          	//  }
                                                                                                                                          	//
                                                                                                                                          	//  func (r *MyRecord) Meta(key string) (interface{}, bool) {
                                                                                                                                          	//  	return l.meta.get(key)
                                                                                                                                          	//  }
                                                                                                                                          	Meta(key string) (v interface{}, ok bool)
                                                                                                                                          
                                                                                                                                          	// Cache holds a cache which is local to the record. It may be used to
                                                                                                                                          	// speed up parsing of specific fields by caching the result. When
                                                                                                                                          	// accessing a field and parsing its value, we want to try caching as much
                                                                                                                                          	// as possible the parsing we do, to avoid redoing it later when
                                                                                                                                          	// the same record is processed by different code.
                                                                                                                                          	// Since cached values are interfaces it's up to who fetches a value to
                                                                                                                                          	// know the underlying type of the cached value and perform a type assertion.
                                                                                                                                          	//
                                                                                                                                          	//  var ll Record
                                                                                                                                          	//  val, ok := ll.Cache.Get("mykey")
                                                                                                                                          	//  if !ok {
                                                                                                                                          	//  	// long computation/parsing...
                                                                                                                                          	//  	val = "14/07/1789"
                                                                                                                                          	//  	ll.Cache.Set("mykey", val)
                                                                                                                                          	//  }
                                                                                                                                          	//
                                                                                                                                          	//  // do something with the result
                                                                                                                                          	//  result := val.(string)
                                                                                                                                          	Cache() *Cache
                                                                                                                                          }

                                                                                                                                            Record is the basic object being processed by baker components. types implementing Record hold the memory representation of a single record.

                                                                                                                                            type ShardingFunc

                                                                                                                                            type ShardingFunc func(Record) uint64

                                                                                                                                              A ShardingFunc calculates a sharding value for a record.

                                                                                                                                              Sharding functions are silent to errors in the specified fields. If a field is corrupt, they will probabily ignore it and still compute the best possible sharding value. Obviously a very corrupted field (eg: empty) could result into an uneven sharding.

                                                                                                                                              type StatsDumper

                                                                                                                                              type StatsDumper struct {
                                                                                                                                              	// contains filtered or unexported fields
                                                                                                                                              }

                                                                                                                                                A StatsDumper gathers statistics about all baker components of topology.

                                                                                                                                                func NewStatsDumper

                                                                                                                                                func NewStatsDumper(t *Topology) (sd *StatsDumper)

                                                                                                                                                  NewStatsDumper creates and initializes a StatsDumper using the given topology and writing stats on standard output. If also exports metrics via the Metrics interface configured with the Topology, if any.

                                                                                                                                                  func (*StatsDumper) Run

                                                                                                                                                  func (sd *StatsDumper) Run() (stop func())

                                                                                                                                                    Run starts dumping stats every second on standard output. Call stop() to stop periodically dumping stats, this prints stats one last time.

                                                                                                                                                    func (*StatsDumper) SetWriter

                                                                                                                                                    func (sd *StatsDumper) SetWriter(w io.Writer)

                                                                                                                                                      SetWriter sets the writer into which stats are written. SetWriter must be called before Run().

                                                                                                                                                      type Topology

                                                                                                                                                      type Topology struct {
                                                                                                                                                      	Input   Input
                                                                                                                                                      	Filters []Filter
                                                                                                                                                      	Output  []Output
                                                                                                                                                      	Upload  Upload
                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                      }

                                                                                                                                                        Topology defines the baker topology, that is how to retrieve records (input), how to process them (filter), and where to output the results (output+upload)

                                                                                                                                                        func NewTopologyFromConfig

                                                                                                                                                        func NewTopologyFromConfig(cfg *Config) (*Topology, error)

                                                                                                                                                          NewTopologyFromConfig gets a baker configuration and returns a Topology

                                                                                                                                                          func (*Topology) Error

                                                                                                                                                          func (t *Topology) Error() error

                                                                                                                                                            Return the global (sticky) error state of the topology. Calling this function makes sense after Wait() is complete (before that, it is potentially subject to races). Errors from the input components are returned here, because they are considered fatals for the topology; all other errors (like transient network stuff during output) are not considered fatal, and are supposed to be handled within the components themselves.

                                                                                                                                                            func (*Topology) Start

                                                                                                                                                            func (t *Topology) Start()

                                                                                                                                                              Start starts the Topology, that is start all components. This function also intercepts the interrupt signal (ctrl+c) starting the graceful shutdown (calling Topology.Stop())

                                                                                                                                                              func (*Topology) Stop

                                                                                                                                                              func (t *Topology) Stop()

                                                                                                                                                                Stop requires the currently running topology stop safely, but ASAP. The stop request is forwarded to the input that triggers the chain of stops from the components (managed into Topology.Wait)

                                                                                                                                                                func (*Topology) Wait

                                                                                                                                                                func (t *Topology) Wait()

                                                                                                                                                                  Wait until the topology shuts itself down. This can happen because the input component exits (in a batch topology), or in response to a SIGINT signal, that is handled as a clean shutdown request.

                                                                                                                                                                  type Upload

                                                                                                                                                                  type Upload interface {
                                                                                                                                                                  	// Run processes the output result as it comes through the channel.
                                                                                                                                                                  	// Run must block forever
                                                                                                                                                                  	// upch will receive filenames that Output wants to see uploaded.
                                                                                                                                                                  	Run(upch <-chan string) error
                                                                                                                                                                  
                                                                                                                                                                  	// Stop forces the upload to stop as cleanly as possible, which usually
                                                                                                                                                                  	// means to finish up all the existing downloads.
                                                                                                                                                                  	Stop()
                                                                                                                                                                  
                                                                                                                                                                  	// Stats returns stats about the upload process
                                                                                                                                                                  	Stats() UploadStats
                                                                                                                                                                  }

                                                                                                                                                                    Upload uploads files created by the topology output to a configured location.

                                                                                                                                                                    type UploadDesc

                                                                                                                                                                    type UploadDesc struct {
                                                                                                                                                                    	Name   string                             // Name of the upload component
                                                                                                                                                                    	New    func(UploadParams) (Upload, error) // New is the constructor-like function called by the topology to create a new upload
                                                                                                                                                                    	Config interface{}                        // Config is the component configuration
                                                                                                                                                                    	Help   string                             // Help string
                                                                                                                                                                    }

                                                                                                                                                                      UploadDesc describes an Upload component to the topology.

                                                                                                                                                                      type UploadParams

                                                                                                                                                                      type UploadParams struct {
                                                                                                                                                                      	ComponentParams
                                                                                                                                                                      }

                                                                                                                                                                        UploadParams is the struct passed to the Upload constructor.

                                                                                                                                                                        type UploadStats

                                                                                                                                                                        type UploadStats struct {
                                                                                                                                                                        	NumProcessedFiles int64
                                                                                                                                                                        	NumErrorFiles     int64
                                                                                                                                                                        	CustomStats       map[string]string
                                                                                                                                                                        	Metrics           MetricsBag
                                                                                                                                                                        }

                                                                                                                                                                          UploadStats contains statistics about the upload component, ready for export to the metric client and to print debug info

                                                                                                                                                                          type UserDesc

                                                                                                                                                                          type UserDesc struct {
                                                                                                                                                                          	Name   string
                                                                                                                                                                          	Config interface{}
                                                                                                                                                                          }

                                                                                                                                                                            UserDesc describes user-specific configuration sections.

                                                                                                                                                                            type ValidationFunc

                                                                                                                                                                            type ValidationFunc func(Record) (bool, FieldIndex)

                                                                                                                                                                              ValidationFunc checks the validity of a record, returning true if it's valid. If a validation error is found it returns false and the index of the field that failed validation.

                                                                                                                                                                              Directories

                                                                                                                                                                              Path Synopsis
                                                                                                                                                                              examples
                                                                                                                                                                              advanced
                                                                                                                                                                              advanced example shows an advanced setup of baker.Components
                                                                                                                                                                              advanced example shows an advanced setup of baker.Components
                                                                                                                                                                              basic
                                                                                                                                                                              basic example illustrates how to build a very simple baker-based program with just input and output components
                                                                                                                                                                              basic example illustrates how to build a very simple baker-based program with just input and output components
                                                                                                                                                                              cli
                                                                                                                                                                              CLI example illustrates how to build a Baker CLI program using baker.MainCLI Run without arguments to see the help.
                                                                                                                                                                              CLI example illustrates how to build a Baker CLI program using baker.MainCLI Run without arguments to see the help.
                                                                                                                                                                              filtering
                                                                                                                                                                              filtering example illustrates how to create a filter component.
                                                                                                                                                                              filtering example illustrates how to create a filter component.
                                                                                                                                                                              help
                                                                                                                                                                              help is a simple example illustrating how to build a baker command with the ability to show an help output for common usage and component-specific instructions.
                                                                                                                                                                              help is a simple example illustrating how to build a baker command with the ability to show an help output for common usage and component-specific instructions.
                                                                                                                                                                              metrics
                                                                                                                                                                              metrics example illustrates how to implement and plug a metrics interface to Baker.
                                                                                                                                                                              metrics example illustrates how to implement and plug a metrics interface to Baker.
                                                                                                                                                                              sharding
                                                                                                                                                                              sharding example shows how to use an output that supports sharding When a shardable output is used, the parallel outputs identified by the "procs" configuration value in the toml, receive a subset of the processed records.
                                                                                                                                                                              sharding example shows how to use an output that supports sharding When a shardable output is used, the parallel outputs identified by the "procs" configuration value in the toml, receive a subset of the processed records.
                                                                                                                                                                              validation
                                                                                                                                                                              validation example illustrates how to specifies the record validation properties in the Baker TOML.
                                                                                                                                                                              validation example illustrates how to specifies the record validation properties in the Baker TOML.
                                                                                                                                                                              Package filter provides filter components.
                                                                                                                                                                              Package filter provides filter components.
                                                                                                                                                                              metadata
                                                                                                                                                                              Package metadata provides a metadata-specific (concurrent and size-limited) cache.
                                                                                                                                                                              Package metadata provides a metadata-specific (concurrent and size-limited) cache.
                                                                                                                                                                              Package input provides input components
                                                                                                                                                                              Package input provides input components
                                                                                                                                                                              Package metrics provides the available metrics implementations
                                                                                                                                                                              Package metrics provides the available metrics implementations
                                                                                                                                                                              datadog
                                                                                                                                                                              Package datadog provides types and functions to export metrics and logs to Datadog via a statds client.
                                                                                                                                                                              Package datadog provides types and functions to export metrics and logs to Datadog via a statds client.
                                                                                                                                                                              Package output provides output components
                                                                                                                                                                              Package output provides output components
                                                                                                                                                                              pkg
                                                                                                                                                                              awsutils
                                                                                                                                                                              Package awsutils provides aws-specific types and functions.
                                                                                                                                                                              Package awsutils provides aws-specific types and functions.
                                                                                                                                                                              buffercache
                                                                                                                                                                              Package buffercache provides the BufferCache type, a kind of map[string][]byte that is optimized for appending new bytes to the cache values.
                                                                                                                                                                              Package buffercache provides the BufferCache type, a kind of map[string][]byte that is optimized for appending new bytes to the cache values.
                                                                                                                                                                              splitwriter
                                                                                                                                                                              Package splitwriter provides a WriteCloser that writes to a file and splits it into smaller files when it's closed.
                                                                                                                                                                              Package splitwriter provides a WriteCloser that writes to a file and splits it into smaller files when it's closed.
                                                                                                                                                                              tutorials
                                                                                                                                                                              Package upload provides upload components
                                                                                                                                                                              Package upload provides upload components