README
¶
Postgres output
It sends the event batches to postgres db using pgx.
Config params
strict
bool
default=false
Deprecated. Use strict_fields
flag instead.
strict_fields
bool
default=false
In strict mode file.d will crash on events without required fields.
conn_string
string
required
PostgreSQL connection string in URL or DSN format.
Example DSN:
user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10
table
string
required
Pg target table.
columns
[]ConfigColumn
required
Array of DB columns. Each column have: name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339) and nullable options.
retry
int
default=10
Retries of insertion. If File.d cannot insert for this number of attempts, File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
fatal_on_failed_insert
bool
default=false
After an insert error, fall with a non-zero exit code or not Experimental feature
retention
cfg.Duration
default=50ms
Retention milliseconds for retry to DB.
db_request_timeout
cfg.Duration
default=3000ms
Multiplier for exponential increase of retention between retries
cfg.Duration
default=3000ms
Timeout for DB requests in milliseconds.
db_health_check_period
cfg.Duration
default=60s
Timeout for DB health check.
workers_count
cfg.Expression
default=gomaxprocs*4
How much workers will be instantiated to send batches.
batch_size
cfg.Expression
default=capacity/4
Maximum quantity of events to pack into one batch.
batch_size
batch_size_bytes
cfg.Expression
default=0
A minimum size of events in a batch to send. If both batch_size and batch_size_bytes are set, they will work together.
batch_flush_timeout
cfg.Duration
default=200ms
After this timeout batch will be sent even if batch isn't completed.
Example
Example Postgres output example:
pipelines:
example_pipeline:
input:
type: file
persistence_mode: async
watching_dir: ./
filename_pattern: input_example.json
offsets_file: ./offsets.yaml
offsets_op: reset
output:
type: postgres
conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10"
table: events
columns:
- name: id
type: int
- name: name
type: string
retry: 10
retention: 1s
retention_exponentially_multiplier: 1.5
input_example.json
{"id":1,"name":"name1"}
{"id":2,"name":"name2"}
Generated using insane-doc
Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrEventDoesntHaveField = errors.New("event doesn't have field") ErrEventFieldHasWrongType = errors.New("event field has wrong type") ErrTimestampFromDistantPastOrFuture = errors.New("event field contains timestamp < 1970 or > 9000 year") )
var ErrEmptyTableName = errors.New("table name can't be empty string")
var ErrNoColumns = errors.New("no pg columns in config")
Functions ¶
Types ¶
type Config ¶
type Config struct { // > @3@4@5@6 // > // > Deprecated. Use `strict_fields` flag instead. Strict bool `json:"strict" default:"false"` // * // > @3@4@5@6 // > // > In strict mode file.d will crash on events without required fields. // Otherwise, events will be discarded. StrictFields bool `json:"strict_fields" default:"false"` // * // > @3@4@5@6 // > // > PostgreSQL connection string in URL or DSN format. // > // > Example DSN: // > // > `user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10` ConnString string `json:"conn_string" required:"true"` // * // > @3@4@5@6 // > // > Pg target table. Table string `json:"table" required:"true"` // * // > @3@4@5@6 // > // > Array of DB columns. Each column have: // > name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339) // > and nullable options. Columns []ConfigColumn `json:"columns" required:"true" slice:"true"` // * // > @3@4@5@6 // > // > Retries of insertion. If File.d cannot insert for this number of attempts, // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). Retry int `json:"retry" default:"10"` // * // > @3@4@5@6 // > // > After an insert error, fall with a non-zero exit code or not // > **Experimental feature** FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 // > // > Retention milliseconds for retry to DB. Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // * Retention_ time.Duration // > @3@4@5@6 // > // > Multiplier for exponential increase of retention between retries RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // > @3@4@5@6 // > // > Timeout for DB requests in milliseconds. DBRequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` // * DBRequestTimeout_ time.Duration // > @3@4@5@6 // > // > Timeout for DB health check. DBHealthCheckPeriod cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` // * DBHealthCheckPeriod_ time.Duration // > @3@4@5@6 // > // > How much workers will be instantiated to send batches. WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // * WorkersCount_ int // > @3@4@5@6 // > // > Maximum quantity of events to pack into one batch. BatchSize cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // * // BatchSize cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // * BatchSize_ int // > @3@4@5@6 // > // > A minimum size of events in a batch to send. // > If both batch_size and batch_size_bytes are set, they will work together. BatchSizeBytes cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // * BatchSizeBytes_ int // > @3@4@5@6 // > // > After this timeout batch will be sent even if batch isn't completed. BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration }
! config-params ^ config-params
type ConfigColumn ¶
type PgQueryBuilder ¶
type PgQueryBuilder interface { GetPgFields() []column GetUniqueFields() map[string]pgType GetInsertBuilder() sq.InsertBuilder GetPostfix() string }
func NewQueryBuilder ¶
func NewQueryBuilder(cfgColumns []ConfigColumn, table string) (PgQueryBuilder, error)
NewQueryBuilder returns new instance of builder.