Documentation ¶
Overview ¶
This package is a core library of gallon. It provides the interface of InputPlugin and OutputPlugin, and the struct of Gallon.
The package also contains input and output plugins:
- input: DynamoDB, MySQL, PostgreSQL and gofakeit generator (See InputPluginRandom)
- output: BigQuery, Stdout, File (JSONL, CSV)
Index ¶
- Variables
- type Gallon
- type InputPlugin
- type InputPluginDynamoDb
- type InputPluginDynamoDbConfig
- type InputPluginDynamoDbConfigSchemaColumn
- type InputPluginRandom
- type InputPluginRandomConfig
- type InputPluginRandomConfigSchemaColumn
- type InputPluginSql
- type InputPluginSqlConfig
- type InputPluginSqlConfigSchemaColumn
- type OutputPlugin
- type OutputPluginBigQuery
- type OutputPluginBigQueryConfig
- type OutputPluginBigQueryConfigSchemaColumn
- type OutputPluginFile
- type OutputPluginFileConfig
- type OutputPluginStdout
- type OutputPluginStdoutConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrTooManyErrors = errors.New("too many errors")
Functions ¶
This section is empty.
Types ¶
type Gallon ¶
type Gallon struct { // Logger will be used for logging. For gallon-cli, zap logger (and the `logr.Logger` interface of it) is used. Logger logr.Logger Input InputPlugin Output OutputPlugin }
Gallon is a struct that runs a migration.
func (*Gallon) Run ¶
Run starts goroutines for extract and load, and waits for them to finish.
If too many errors are occurred, it will cancel the context and return ErrTooManyErrors.
Example ¶
// If you don't want to write a config yaml, you can use NewInputPluginDynamoDb to instantiate an input plugin. input, err := NewInputPluginDynamoDbFromConfig([]byte(` type: dynamodb table: users schema: id: type: string name: type: string age: type: number created_at: type: number `)) if err != nil { panic(err) } output, err := NewOutputPluginFileFromConfig([]byte(` type: file filepath: ./output.jsonl format: jsonl `)) if err != nil { panic(err) } g := Gallon{ Logger: zapr.NewLogger(zap.L()), Input: input, Output: output, } if err := g.Run(context.Background()); err != nil { panic(err) }
Output:
type InputPlugin ¶
type InputPlugin interface { // ReplaceLogger replaces the logger of the plugin. // It is called in Gallon.Run() at the beginning. ReplaceLogger(logr.Logger) // Extract extracts data from the source and sends it to the messages channel. // If an error occurs, send it to the errs channel. Extract(ctx context.Context, messages chan interface{}, errs chan error) error }
type InputPluginDynamoDb ¶
type InputPluginDynamoDb struct {
// contains filtered or unexported fields
}
func NewInputPluginDynamoDb ¶
func NewInputPluginDynamoDb( client *dynamodb.Client, tableName string, serialize func(map[string]types.AttributeValue) (interface{}, error), ) *InputPluginDynamoDb
func NewInputPluginDynamoDbFromConfig ¶
func NewInputPluginDynamoDbFromConfig(configYml []byte) (*InputPluginDynamoDb, error)
func (*InputPluginDynamoDb) Extract ¶
func (p *InputPluginDynamoDb) Extract( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*InputPluginDynamoDb) ReplaceLogger ¶
func (p *InputPluginDynamoDb) ReplaceLogger(logger logr.Logger)
type InputPluginDynamoDbConfig ¶
type InputPluginDynamoDbConfig struct { Table string `yaml:"table"` Schema map[string]InputPluginDynamoDbConfigSchemaColumn `yaml:"schema"` Region string `yaml:"region"` Endpoint *string `yaml:"endpoint"` }
type InputPluginDynamoDbConfigSchemaColumn ¶
type InputPluginDynamoDbConfigSchemaColumn struct {
Type string `yaml:"type"`
}
type InputPluginRandom ¶ added in v0.2.0
type InputPluginRandom struct {
// contains filtered or unexported fields
}
func NewInputPluginRandom ¶ added in v0.2.0
func NewInputPluginRandom( pageSize int, pageLimit int, generate func(int) (interface{}, error), ) *InputPluginRandom
func NewInputPluginRandomFromConfig ¶ added in v0.2.0
func NewInputPluginRandomFromConfig(configYml []byte) (*InputPluginRandom, error)
func (*InputPluginRandom) Extract ¶ added in v0.2.0
func (p *InputPluginRandom) Extract( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*InputPluginRandom) ReplaceLogger ¶ added in v0.2.0
func (p *InputPluginRandom) ReplaceLogger(logger logr.Logger)
type InputPluginRandomConfig ¶ added in v0.2.0
type InputPluginRandomConfig struct { PageSize int `yaml:"pageSize"` PageLimit int `yaml:"pageLimit"` Schema map[string]InputPluginRandomConfigSchemaColumn `yaml:"schema"` }
type InputPluginRandomConfigSchemaColumn ¶ added in v0.2.0
type InputPluginSql ¶
type InputPluginSql struct {
// contains filtered or unexported fields
}
func NewInputPluginSql ¶
func NewInputPluginSqlFromConfig ¶
func NewInputPluginSqlFromConfig(configYml []byte) (*InputPluginSql, error)
func (*InputPluginSql) Extract ¶
func (p *InputPluginSql) Extract( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*InputPluginSql) ReplaceLogger ¶
func (p *InputPluginSql) ReplaceLogger(logger logr.Logger)
type InputPluginSqlConfig ¶
type InputPluginSqlConfig struct { Table string `yaml:"table"` DatabaseUrl string `yaml:"database_url"` Driver string `yaml:"driver"` Schema map[string]InputPluginSqlConfigSchemaColumn `yaml:"schema"` }
type InputPluginSqlConfigSchemaColumn ¶
type InputPluginSqlConfigSchemaColumn struct {
Type string `yaml:"type"`
}
type OutputPlugin ¶
type OutputPlugin interface { // ReplaceLogger replaces the logger of the plugin. // It is called in Gallon.Run() at the beginning. ReplaceLogger(logr.Logger) // Load loads data from the messages channel and sends it to the destination. // If an error occurs, send it to the errs channel. Load(ctx context.Context, messages chan interface{}, errs chan error) error }
type OutputPluginBigQuery ¶
type OutputPluginBigQuery struct {
// contains filtered or unexported fields
}
func NewOutputPluginBigQuery ¶
func NewOutputPluginBigQueryFromConfig ¶
func NewOutputPluginBigQueryFromConfig(configYml []byte) (*OutputPluginBigQuery, error)
func (*OutputPluginBigQuery) Load ¶
func (p *OutputPluginBigQuery) Load( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*OutputPluginBigQuery) ReplaceLogger ¶
func (p *OutputPluginBigQuery) ReplaceLogger(logger logr.Logger)
type OutputPluginBigQueryConfig ¶
type OutputPluginBigQueryConfig struct { ProjectId string `yaml:"projectId"` DatasetId string `yaml:"datasetId"` TableId string `yaml:"tableId"` Endpoint *string `yaml:"endpoint"` Schema map[string]OutputPluginBigQueryConfigSchemaColumn `yaml:"schema"` DeleteTemporaryTable *bool `yaml:"deleteTemporaryTable"` }
type OutputPluginBigQueryConfigSchemaColumn ¶
type OutputPluginBigQueryConfigSchemaColumn struct {
Type string `yaml:"type"`
}
type OutputPluginFile ¶
type OutputPluginFile struct {
// contains filtered or unexported fields
}
func NewOutputPluginFile ¶
func NewOutputPluginFile( deserialize func(interface{}) ([]byte, error), newWriter func() (io.WriteCloser, error), ) *OutputPluginFile
func NewOutputPluginFileFromConfig ¶
func NewOutputPluginFileFromConfig(configYml []byte) (*OutputPluginFile, error)
func (*OutputPluginFile) Load ¶
func (p *OutputPluginFile) Load( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*OutputPluginFile) ReplaceLogger ¶
func (p *OutputPluginFile) ReplaceLogger(logger logr.Logger)
type OutputPluginFileConfig ¶
type OutputPluginStdout ¶ added in v0.2.0
type OutputPluginStdout struct {
// contains filtered or unexported fields
}
func NewOutputPluginStdout ¶ added in v0.2.0
func NewOutputPluginStdout( deserialize func(interface{}) ([]byte, error), ) *OutputPluginStdout
func NewOutputPluginStdoutFromConfig ¶ added in v0.2.0
func NewOutputPluginStdoutFromConfig(configYml []byte) (*OutputPluginStdout, error)
func (*OutputPluginStdout) Load ¶ added in v0.2.0
func (p *OutputPluginStdout) Load( ctx context.Context, messages chan interface{}, errs chan error, ) error
func (*OutputPluginStdout) ReplaceLogger ¶ added in v0.2.0
func (p *OutputPluginStdout) ReplaceLogger(logger logr.Logger)
type OutputPluginStdoutConfig ¶ added in v0.2.0
type OutputPluginStdoutConfig struct {
Format string `yaml:"format"`
}
Click to show internal directories.
Click to hide internal directories.