gallon

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: MIT Imports: 27 Imported by: 0

Documentation

Overview

This package is a core library of gallon. It provides the interface of InputPlugin and OutputPlugin, and the struct of Gallon.

The package also contains input and output plugins:

  • input: DynamoDB, MySQL, PostgreSQL and gofakeit generator (See InputPluginRandom)
  • output: BigQuery, Stdout, File (JSONL, CSV)

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTooManyErrors = errors.New("too many errors")

Functions

This section is empty.

Types

type Gallon

type Gallon struct {
	// Logger will be used for logging. For gallon-cli, zap logger (and the `logr.Logger` interface of it) is used.
	Logger logr.Logger
	Input  InputPlugin
	Output OutputPlugin
}

Gallon is a struct that runs a migration.

func (*Gallon) Run

func (g *Gallon) Run(ctx context.Context) error

Run starts goroutines for extract and load, and waits for them to finish.

If too many errors are occurred, it will cancel the context and return ErrTooManyErrors.

Example
// If you don't want to write a config yaml, you can use NewInputPluginDynamoDb to instantiate an input plugin.
input, err := NewInputPluginDynamoDbFromConfig([]byte(`
type: dynamodb
table: users
schema:
	id:
	  type: string
	name:
	  type: string
	age:
	  type: number
	created_at:
	  type: number
`))
if err != nil {
	panic(err)
}

output, err := NewOutputPluginFileFromConfig([]byte(`
type: file
filepath: ./output.jsonl
format: jsonl
`))
if err != nil {
	panic(err)
}

g := Gallon{
	Logger: zapr.NewLogger(zap.L()),
	Input:  input,
	Output: output,
}
if err := g.Run(context.Background()); err != nil {
	panic(err)
}
Output:

type InputPlugin

type InputPlugin interface {
	// ReplaceLogger replaces the logger of the plugin.
	// It is called in Gallon.Run() at the beginning.
	ReplaceLogger(logr.Logger)
	// Extract extracts data from the source and sends it to the messages channel.
	// If an error occurs, send it to the errs channel.
	Extract(ctx context.Context, messages chan interface{}, errs chan error) error
}

type InputPluginDynamoDb

type InputPluginDynamoDb struct {
	// contains filtered or unexported fields
}

func NewInputPluginDynamoDb

func NewInputPluginDynamoDb(
	client *dynamodb.Client,
	tableName string,
	serialize func(map[string]types.AttributeValue) (interface{}, error),
) *InputPluginDynamoDb

func NewInputPluginDynamoDbFromConfig

func NewInputPluginDynamoDbFromConfig(configYml []byte) (*InputPluginDynamoDb, error)

func (*InputPluginDynamoDb) Extract

func (p *InputPluginDynamoDb) Extract(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*InputPluginDynamoDb) ReplaceLogger

func (p *InputPluginDynamoDb) ReplaceLogger(logger logr.Logger)

type InputPluginDynamoDbConfig

type InputPluginDynamoDbConfig struct {
	Table    string                                           `yaml:"table"`
	Schema   map[string]InputPluginDynamoDbConfigSchemaColumn `yaml:"schema"`
	Region   string                                           `yaml:"region"`
	Endpoint *string                                          `yaml:"endpoint"`
}

type InputPluginDynamoDbConfigSchemaColumn

type InputPluginDynamoDbConfigSchemaColumn struct {
	Type string `yaml:"type"`
}

type InputPluginRandom added in v0.2.0

type InputPluginRandom struct {
	// contains filtered or unexported fields
}

func NewInputPluginRandom added in v0.2.0

func NewInputPluginRandom(
	pageSize int,
	pageLimit int,
	generate func(int) (interface{}, error),
) *InputPluginRandom

func NewInputPluginRandomFromConfig added in v0.2.0

func NewInputPluginRandomFromConfig(configYml []byte) (*InputPluginRandom, error)

func (*InputPluginRandom) Extract added in v0.2.0

func (p *InputPluginRandom) Extract(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*InputPluginRandom) ReplaceLogger added in v0.2.0

func (p *InputPluginRandom) ReplaceLogger(logger logr.Logger)

type InputPluginRandomConfig added in v0.2.0

type InputPluginRandomConfig struct {
	PageSize  int                                            `yaml:"pageSize"`
	PageLimit int                                            `yaml:"pageLimit"`
	Schema    map[string]InputPluginRandomConfigSchemaColumn `yaml:"schema"`
}

type InputPluginRandomConfigSchemaColumn added in v0.2.0

type InputPluginRandomConfigSchemaColumn struct {
	Type   string  `yaml:"type"`
	Min    *int    `yaml:"min"`
	Max    *int    `yaml:"max"`
	Format *string `yaml:"format"`
}

type InputPluginSql

type InputPluginSql struct {
	// contains filtered or unexported fields
}

func NewInputPluginSql

func NewInputPluginSql(
	client *sql.DB,
	tableName string,
	driver string,
	serialize func(map[string]interface{}) (interface{}, error),
) *InputPluginSql

func NewInputPluginSqlFromConfig

func NewInputPluginSqlFromConfig(configYml []byte) (*InputPluginSql, error)

func (*InputPluginSql) Extract

func (p *InputPluginSql) Extract(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*InputPluginSql) ReplaceLogger

func (p *InputPluginSql) ReplaceLogger(logger logr.Logger)

type InputPluginSqlConfig

type InputPluginSqlConfig struct {
	Table       string                                      `yaml:"table"`
	DatabaseUrl string                                      `yaml:"database_url"`
	Driver      string                                      `yaml:"driver"`
	Schema      map[string]InputPluginSqlConfigSchemaColumn `yaml:"schema"`
}

type InputPluginSqlConfigSchemaColumn

type InputPluginSqlConfigSchemaColumn struct {
	Type string `yaml:"type"`
}

type OutputPlugin

type OutputPlugin interface {
	// ReplaceLogger replaces the logger of the plugin.
	// It is called in Gallon.Run() at the beginning.
	ReplaceLogger(logr.Logger)
	// Load loads data from the messages channel and sends it to the destination.
	// If an error occurs, send it to the errs channel.
	Load(ctx context.Context, messages chan interface{}, errs chan error) error
}

type OutputPluginBigQuery

type OutputPluginBigQuery struct {
	// contains filtered or unexported fields
}

func NewOutputPluginBigQuery

func NewOutputPluginBigQuery(
	client *bigquery.Client,
	endpoint *string,
	datasetId string,
	tableId string,
	schema bigquery.Schema,
	deserialize func(interface{}) ([]bigquery.Value, error),
	deleteTemporaryTable bool,
) *OutputPluginBigQuery

func NewOutputPluginBigQueryFromConfig

func NewOutputPluginBigQueryFromConfig(configYml []byte) (*OutputPluginBigQuery, error)

func (*OutputPluginBigQuery) Load

func (p *OutputPluginBigQuery) Load(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*OutputPluginBigQuery) ReplaceLogger

func (p *OutputPluginBigQuery) ReplaceLogger(logger logr.Logger)

type OutputPluginBigQueryConfig

type OutputPluginBigQueryConfig struct {
	ProjectId            string                                            `yaml:"projectId"`
	DatasetId            string                                            `yaml:"datasetId"`
	TableId              string                                            `yaml:"tableId"`
	Endpoint             *string                                           `yaml:"endpoint"`
	Schema               map[string]OutputPluginBigQueryConfigSchemaColumn `yaml:"schema"`
	DeleteTemporaryTable *bool                                             `yaml:"deleteTemporaryTable"`
}

type OutputPluginBigQueryConfigSchemaColumn

type OutputPluginBigQueryConfigSchemaColumn struct {
	Type string `yaml:"type"`
}

type OutputPluginFile

type OutputPluginFile struct {
	// contains filtered or unexported fields
}

func NewOutputPluginFile

func NewOutputPluginFile(
	deserialize func(interface{}) ([]byte, error),
	newWriter func() (io.WriteCloser, error),
) *OutputPluginFile

func NewOutputPluginFileFromConfig

func NewOutputPluginFileFromConfig(configYml []byte) (*OutputPluginFile, error)

func (*OutputPluginFile) Load

func (p *OutputPluginFile) Load(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*OutputPluginFile) ReplaceLogger

func (p *OutputPluginFile) ReplaceLogger(logger logr.Logger)

type OutputPluginFileConfig

type OutputPluginFileConfig struct {
	Filepath string `yaml:"filepath"`
	Format   string `yaml:"format"`
	Header   *bool  `yaml:"header"`
}

type OutputPluginStdout added in v0.2.0

type OutputPluginStdout struct {
	// contains filtered or unexported fields
}

func NewOutputPluginStdout added in v0.2.0

func NewOutputPluginStdout(
	deserialize func(interface{}) ([]byte, error),
) *OutputPluginStdout

func NewOutputPluginStdoutFromConfig added in v0.2.0

func NewOutputPluginStdoutFromConfig(configYml []byte) (*OutputPluginStdout, error)

func (*OutputPluginStdout) Load added in v0.2.0

func (p *OutputPluginStdout) Load(
	ctx context.Context,
	messages chan interface{},
	errs chan error,
) error

func (*OutputPluginStdout) ReplaceLogger added in v0.2.0

func (p *OutputPluginStdout) ReplaceLogger(logger logr.Logger)

type OutputPluginStdoutConfig added in v0.2.0

type OutputPluginStdoutConfig struct {
	Format string `yaml:"format"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL