bqin

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2020 License: MIT Imports: 27 Imported by: 0

README

BQin

CircleCI

BQin is a BigQuery data importer with AWS S3 and SQS messaging.
Respected to http://github.com/fujiwara/Rin

Architecture

  1. (Someone) creates a S3 object.
  2. S3 event notifications will send to a message to SQS.
  3. BQin will fetch messages from SQS
  4. BQin copy S3 object to Google Cloud Storage [this is temporary bucket], and create BigQuery Load Job

Configuration

Configuring Amazon S3 Event Notifications.

  1. Create SQS queue.
  2. Attach SQS access policy to the queue. Example Walkthrough 1:
  3. Enable Event Notifications on a S3 bucket.
  4. Create a temporary bucket on Google Cloud Storage and create the target dataset on BigQuery.
  5. Run bqin process with configuration for using the SQS and S3.
config.yaml
queue_name: my_queue_name    # SQS queue name

cloud:
  aws:
    region: ap-northeast-1

s3:
  bucket: bqin.bucket.test
  region: ap-northeast-1

big_query:
  project_id: bqin-test
  dataset: test

option:
  temporary_bucket: my_bucket_name # GCP temporary bucket
  gzip: true
  source_format: json # [csv, json, parquet] select able
  auto_detect: true # works only csv or json

# define load rule
rules:
  - big_query: # standard rule
      table: user
    s3:
      key_prefix: data/user

  - big_query:  # expand by key_regexp captured value. for date-sharded tables.
      table: $1_$2
    s3:
      key_regexp: data/(.+)/part-([0-9]+).gz

  - big_query: # override default section in this rule
      project_id: hoge
      dataset: bqin_test
      table: role
    s3:
      bucket: bqin.bucket.test
      key_regexp: data/(.+)/part-([0-9]+).csv
    option:
      gzip: false
      source_format: csv

A configuration file is parsed by kayac/go-config.

go-config expands environment variables using syntax {{ env "FOO" }} or {{ must_env "FOO" }} in a configuration file.

Credentials

BQin requires some credentials.

cloud:
  aws:
    region: ap-northeast-1
    access_key_id: {{ must_env "ACCESSS_KEY_ID" }}
    secret_access_key: {{ must_env "SECRET_ACCESS_KEY" }}
  gcp:
    base64_credential: {{ must_env "GCP_CREDENTIAL_BASE64_JSON" }}

Note: For GCP credentials, specify a Base64-encoded string of the contents of the JSON file

Run

normally

BQin waits new SQS messages and processing it continually.

$ bqin run -config config.yaml [-debug]
batch

BQin receive SQS messages and processing. exit when all messages in the queue have been read.

$ bqin batch -config config.yaml -queue <dlq-queue-name> [-debug]

Check Rule

$ echo "s3://bucket.example.com/object.txt" | bqin check -config config.yaml

LICENCE

MIT

Author

KAYAC Inc.

Documentation

Index

Constants

View Source
const (
	S3URITemplate         = "s3://%s/%s"
	BigQueryTableTemplate = "%s.%s.%s"
)
View Source
const (
	Unknown SourceFormat = ""
	CSV                  = "csv"
	JSON                 = "json"
	Parquet              = "parquet"
)

Variables

View Source
var (
	ErrMaxRetry  = errors.New("max retry count reached")
	ErrNoMessage = errors.New("no sqs message")
)
View Source
var ErrInvalidHandle = errors.New("invalid handle")

Functions

This section is empty.

Types

type AWS added in v0.3.0

type AWS struct {
	Region                  string `yaml:"region,omitempty"`
	DisableSSL              bool   `yaml:"disable_ssl,omitempty"`
	S3ForcePathStyle        bool   `yaml:"s3_force_path_style,omitempty"`
	S3Endpoint              string `yaml:"s3_endpoint,omitempty"`
	SQSEndpoint             string `yaml:"sqs_endpoint,omitempty"`
	AccessKeyID             string `yaml:"access_key_id,omitempty"`
	SecretAccessKey         string `yaml:"secret_access_key,omitempty"`
	DisableShardConfigState bool   `yaml:"disable_shard_config_state,omitempty"`
}

type App

type App struct {
	*Receiver
	*Resolver
	*Transporter
	*Loader
}

func NewApp

func NewApp(conf *Config) *App

func (*App) Run added in v0.3.0

func (app *App) Run(ctx context.Context, opts ...RunOption) error

type Base64String added in v0.3.0

type Base64String []byte

func (Base64String) Bytes added in v0.3.0

func (s Base64String) Bytes() []byte

func (Base64String) IsEmpty added in v0.3.0

func (s Base64String) IsEmpty() bool

func (Base64String) String added in v0.3.0

func (s Base64String) String() string

func (*Base64String) UnmarshalYAML added in v0.3.0

func (s *Base64String) UnmarshalYAML(unmarshal func(interface{}) error) (err error)

type Cloud added in v0.3.0

type Cloud struct {
	AWS *AWS `yaml:"aws,omitempty"`
	GCP *GCP `yaml:"gcp,omitempty"`
}

func (*Cloud) Validate added in v0.3.0

func (c *Cloud) Validate() error

type Config

type Config struct {
	QueueName string `yaml:"queue_name"`
	Cloud     *Cloud `yaml:"cloud"`

	Rules []*Rule `yaml:"rules"`
	Rule  `yaml:",inline"`
}

func LoadConfig

func LoadConfig(path string) (*Config, error)

func NewDefaultConfig added in v0.3.0

func NewDefaultConfig() *Config

func (*Config) Validate added in v0.1.3

func (c *Config) Validate() error

type Factory added in v0.3.0

type Factory struct {
	*Config
}

func (*Factory) NewAWSSession added in v0.3.0

func (f *Factory) NewAWSSession() *session.Session

func (*Factory) NewApp added in v0.3.0

func (f *Factory) NewApp() *App

func (*Factory) NewBigQueryOptions added in v0.3.0

func (f *Factory) NewBigQueryOptions() []option.ClientOption

func (*Factory) NewCloudStorageOptions added in v0.3.0

func (f *Factory) NewCloudStorageOptions() []option.ClientOption

func (*Factory) NewGCPOptions added in v0.3.0

func (f *Factory) NewGCPOptions() []option.ClientOption

func (*Factory) NewLoader added in v0.3.0

func (f *Factory) NewLoader() *Loader

func (*Factory) NewReceiver added in v0.3.0

func (f *Factory) NewReceiver() *Receiver

func (*Factory) NewResolver added in v0.3.0

func (f *Factory) NewResolver() *Resolver

func (*Factory) NewTransporter added in v0.3.0

func (f *Factory) NewTransporter() *Transporter

type GCP added in v0.3.0

type GCP struct {
	WithoutAuthentication bool         `yaml:"without_authentication,omitempty"`
	BigQueryEndpoint      string       `yaml:"big_query_endpoint,omitempty"`
	CloudStorageEndpoint  string       `yaml:"cloud_storage_endpoint,omitempty"`
	Base64Credential      Base64String `yaml:"base64_credential"`
}

type Job added in v0.3.0

type Job struct {
	*TransportJob
	*LoadingJob
}

func (*Job) String added in v0.3.0

func (job *Job) String() string

type JobOption added in v0.3.0

type JobOption struct {
	TemporaryBucket string       `yaml:"temporary_bucket" json:"temporary_bucket"`
	GZip            *bool        `yaml:"gzip,omitempty" json:"gzip,omitempty"`
	AutoDetect      *bool        `yaml:"auto_detect,omitempty" json:"auto_detect,omitempty"`
	SourceFormat    SourceFormat `yaml:"source_format" json:"source_format"`
}

func (*JobOption) Clone added in v0.3.0

func (o *JobOption) Clone() *JobOption

func (*JobOption) MergeIn added in v0.3.0

func (o *JobOption) MergeIn(other *JobOption)

func (*JobOption) Validate added in v0.3.0

func (o *JobOption) Validate() error

type Loader added in v0.3.0

type Loader struct {
	// contains filtered or unexported fields
}

func NewLoader added in v0.3.0

func NewLoader(opts ...option.ClientOption) *Loader

func (*Loader) Load added in v0.3.0

func (l *Loader) Load(ctx context.Context, job *LoadingJob) error

type LoadingDestination added in v0.3.0

type LoadingDestination struct {
	ProjectID string `yaml:"project_id" json:"project_id"`
	Dataset   string `yaml:"dataset" json:"dataset"`
	Table     string `yaml:"table" json:"table"`
}

func (*LoadingDestination) Clone added in v0.3.0

func (*LoadingDestination) MergeIn added in v0.3.0

func (bq *LoadingDestination) MergeIn(other *LoadingDestination)

func (LoadingDestination) String added in v0.3.0

func (bq LoadingDestination) String() string

type LoadingJob added in v0.3.0

type LoadingJob struct {
	GCSRef *bigquery.GCSReference
	*LoadingDestination

	CreateDisposition bigquery.TableCreateDisposition
	WriteDisposition  bigquery.TableWriteDisposition
}

func NewLoadingJob added in v0.3.0

func NewLoadingJob(dest *LoadingDestination, objectURIs ...string) *LoadingJob

func (*LoadingJob) String added in v0.3.0

func (job *LoadingJob) String() string

type ReceiptHandle added in v0.3.0

type ReceiptHandle struct {
	// contains filtered or unexported fields
}

func (*ReceiptHandle) Cleanup added in v0.3.0

func (h *ReceiptHandle) Cleanup()

func (*ReceiptHandle) Complete added in v0.3.0

func (h *ReceiptHandle) Complete() error

func (*ReceiptHandle) Debugf added in v0.3.0

func (h *ReceiptHandle) Debugf(format string, args ...interface{})

func (*ReceiptHandle) Errorf added in v0.3.0

func (h *ReceiptHandle) Errorf(format string, args ...interface{})

func (*ReceiptHandle) Infof added in v0.3.0

func (h *ReceiptHandle) Infof(format string, args ...interface{})

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func NewReceiver added in v0.3.0

func NewReceiver(queueName string, sess *session.Session) *Receiver

func (*Receiver) GetQueueName added in v0.3.0

func (r *Receiver) GetQueueName() string

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) ([]*url.URL, *ReceiptHandle, error)

func (*Receiver) SetQueueName added in v0.3.0

func (r *Receiver) SetQueueName(queueName string)

type Resolver added in v0.3.0

type Resolver struct {
	// contains filtered or unexported fields
}

func NewResolver added in v0.3.0

func NewResolver(rules []*Rule) *Resolver

func (*Resolver) Resolve added in v0.3.0

func (r *Resolver) Resolve(urls []*url.URL) []*Job

type Rule

type Rule struct {
	S3       *S3Soruce           `yaml:"s3"`
	BigQuery *LoadingDestination `yaml:"big_query"`
	Option   *JobOption          `yaml:"option"`
	// contains filtered or unexported fields
}

func (*Rule) Clone added in v0.1.3

func (r *Rule) Clone() *Rule

func (*Rule) Match

func (r *Rule) Match(u *url.URL) (bool, []string)

func (*Rule) MergeIn added in v0.1.3

func (r *Rule) MergeIn(other *Rule)

func (*Rule) String

func (r *Rule) String() string

func (*Rule) Validate added in v0.1.3

func (r *Rule) Validate() error

type RunOption added in v0.3.0

type RunOption interface {
	Apply(*RunSettings)
}

func WithExitError added in v0.3.0

func WithExitError(flag bool) RunOption

func WithExitNoMessage added in v0.3.0

func WithExitNoMessage(flag bool) RunOption

func WithQueueName added in v0.3.0

func WithQueueName(queueName string) RunOption

type RunSettings added in v0.3.0

type RunSettings struct {
	ExitNoMessage bool
	ExitError     bool
	QueueName     string
}

func (*RunSettings) Apply added in v0.3.0

func (s *RunSettings) Apply(o *RunSettings)

type S3Object added in v0.1.3

type S3Object struct {
	Bucket string `json:"bucket"`
	Object string `json:"object"`
}

func (S3Object) String added in v0.1.3

func (s S3Object) String() string

type S3Soruce

type S3Soruce struct {
	Region    string `yaml:"region"`
	Bucket    string `yaml:"bucket"`
	KeyPrefix string `yaml:"key_prefix"`
	KeyRegexp string `yaml:"key_regexp"`
}

func (*S3Soruce) Clone added in v0.1.3

func (s3 *S3Soruce) Clone() *S3Soruce

func (*S3Soruce) MergeIn added in v0.1.3

func (s3 *S3Soruce) MergeIn(other *S3Soruce)

func (S3Soruce) String

func (s3 S3Soruce) String() string

type SourceFormat added in v0.2.0

type SourceFormat string

func (SourceFormat) Is added in v0.2.0

func (f SourceFormat) Is(others ...SourceFormat) bool

func (SourceFormat) IsSupport added in v0.2.0

func (f SourceFormat) IsSupport() bool

type TransportJob added in v0.3.0

type TransportJob struct {
	Source      *url.URL
	Destination *url.URL
}

func (*TransportJob) String added in v0.3.0

func (job *TransportJob) String() string

type TransportJobHandle added in v0.3.0

type TransportJobHandle struct {
	// contains filtered or unexported fields
}

func (*TransportJobHandle) Cleanup added in v0.3.0

func (h *TransportJobHandle) Cleanup(ctx context.Context) error

type Transporter added in v0.3.0

type Transporter struct {
	// contains filtered or unexported fields
}

func NewTransporter added in v0.3.0

func NewTransporter(sess *session.Session, opts ...option.ClientOption) *Transporter

func (*Transporter) Transport added in v0.3.0

func (t *Transporter) Transport(ctx context.Context, job *TransportJob) (*TransportJobHandle, error)

Directories

Path Synopsis
cmd
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL