README

bqloader

PkgGoDev Main Branch Workflow Go Report Card codecov GitHub GitHub tag (latest SemVer)

bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.

Installation

go get -u go.nownabe.dev/bqloader

Getting Started with Pre-configured Handlers

See the example to get a full instruction.

To load some types of CSV formats, you can use pre-configured handlers. See full list.

package myfunc

import (
	"context"

	"go.nownabe.dev/bqloader"
	"go.nownabe.dev/bqloader/contrib/handlers"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()

	t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
	n := &bqloader.SlackNotifier{
		Token:   os.Getenv("SLACK_TOKEN"),
		Channel: os.Getenv("SLACK_CHANNEL"),
	}

	handlers.MustAddHandlers(context.Background(), loader,
		/*
			These build handlers to load CSVs, given four arguments:
			handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier.
		*/
		handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n),
		handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n),
	)
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Getting Started with Custom Handlers

(See Quickstart example to get a full instruction.)

To load other CSVs, import the package go.nownabe.dev/bqloader and write your custom handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	/*
		This projector converts date fields formatted as "2006/01/02"
		at the first column into strings like "2006-01-02" that satisfies
		BigQuery date type.
	*/
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Diagram

diagram

Documentation

Overview

Package bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.

Getting started with pre-configured handlers

See the example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/pre_configured_handlers

To load some types of CSV formats, you can use pre-configured handlers. See the full list on GitHub. https://github.com/nownabe/go-bqloader/tree/main/contrib/handlers

package myfunc

import (
	"context"

	"go.nownabe.dev/bqloader"
	"go.nownabe.dev/bqloader/contrib/handlers"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()

	t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
	n := &bqloader.SlackNotifier{
		Token:   os.Getenv("SLACK_TOKEN"),
		Channel: os.Getenv("SLACK_CHANNEL"),
	}

	handlers.MustAddHandlers(context.Background(), loader,
		// These build handlers to load CSVs, given four arguments:
		// handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier.
		handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n),
		handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n),
	)
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Getting started with custom handlers

See Quickstart example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/quickstart

For simple transforming and loading CSV, import the package `go.nownabe.dev/bqloader` and write your handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	// This projector converts date fields formatted as "2006/01/02"
	// at the first column into strings like "2006-01-02" that satisfies
	// BigQuery date type.
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BQLoader

type BQLoader interface {
	AddHandler(context.Context, *Handler) error
	Handle(context.Context, Event) error
	MustAddHandler(context.Context, *Handler)
}

    BQLoader loads data from Cloud Storage to BigQuery table.

    func New

    func New(opts ...Option) (BQLoader, error)

      New build a new Loader.

      type Event

      type Event struct {
      	Name        string    `json:"name"`
      	Bucket      string    `json:"bucket"`
      	TimeCreated time.Time `json:"timeCreated"`
      	// contains filtered or unexported fields
      }

        Event is an event from Cloud Storage.

        func (*Event) FullPath

        func (e *Event) FullPath() string

          FullPath returns full path of storage object beginning with gs://.

          type Extractor

          type Extractor interface {
          	Extract(context.Context, Event) (io.Reader, func(), error)
          }

            Extractor extracts data from source such as Cloud Storage.

            type Handler

            type Handler struct {
            	// Name is the handler's name.
            	Name string
            
            	Pattern         *regexp.Regexp
            	Encoding        encoding.Encoding
            	Parser          Parser
            	Notifier        Notifier
            	Projector       Projector
            	SkipLeadingRows uint
            	Preprocessor    Preprocessor
            
            	// BatchSize specifies how much records are processed in a groutine.
            	// Default is 10000.
            	BatchSize int
            
            	// Project specifies GCP project name of destination BigQuery table.
            	Project string
            
            	// Dataset specifies BigQuery dataset ID of destination table
            	Dataset string
            
            	// Table specifies BigQuery table ID as destination.
            	Table string
            
            	Extractor Extractor
            	Loader    Loader
            	// contains filtered or unexported fields
            }

              Handler defines how to handle events which match specified pattern.

              func (*Handler) Handle

              func (h *Handler) Handle(ctx context.Context, e Event) error

                Handle handles events.

                func (*Handler) SetConcurrency

                func (h *Handler) SetConcurrency(n int)

                  SetConcurrency sets handler's concurrency directly. Normally set concurrency to BQLoader with WithConcurrency option.

                  type Loader

                  type Loader interface {
                  	Load(context.Context, [][]string) error
                  }

                    Loader loads projected data into a destination such as BigQuery.

                    type Notifier

                    type Notifier interface {
                    	Notify(context.Context, *Result) error
                    }

                      Notifier notifies results for each event.

                      type Option

                      type Option interface {
                      	// contains filtered or unexported methods
                      }

                        Option configures BQLoader.

                        func WithConcurrency

                        func WithConcurrency(n int) Option

                          WithConcurrency configures the concurrency of projectors. Before setting this, confirm GOMAXPROCS.

                          func WithLogLevel

                          func WithLogLevel(l string) Option

                            WithLogLevel configures log level to print logs. Allowed values are trace, debug, info, warn, error, fatal or panic.

                            func WithPrettyLogging

                            func WithPrettyLogging() Option

                              WithPrettyLogging configures BQLoader to print human friendly logs.

                              type Parser

                              type Parser func(context.Context, io.Reader) ([][]string, error)

                                Parser parses files from storage.

                                func CSVParser

                                func CSVParser() Parser

                                  CSVParser provides a parser to parse CSV files.

                                  type Preprocessor

                                  type Preprocessor func(context.Context, Event) (context.Context, error)

                                    Preprocessor preprocesses event and store data into a map.

                                    type Projector

                                    type Projector func(context.Context, []string) ([]string, error)

                                      Projector transforms source records into records for destination.

                                      type Result

                                      type Result struct {
                                      	Event   Event
                                      	Handler *Handler
                                      	Error   error
                                      }

                                        Result is a result for each event.

                                        type SlackNotifier

                                        type SlackNotifier struct {
                                        	Channel string
                                        	Token   string
                                        
                                        	// Optional.
                                        	IconEmoji string
                                        
                                        	// Optional.
                                        	Username string
                                        
                                        	// Optional.
                                        	HTTPClient *http.Client
                                        	// contains filtered or unexported fields
                                        }

                                          SlackNotifier is a notifier for Slack. SlackNotifier requires bot token and permissions. Recommended permissions are chat:write, chat:write.customize and chat:write.public.

                                          func (*SlackNotifier) Notify

                                          func (n *SlackNotifier) Notify(ctx context.Context, r *Result) error

                                            Notify notifies results to Slack channel.

                                            Directories

                                            Path Synopsis
                                            contrib
                                            handlers
                                            Package handlers includes pre-configured handlers for bqloader.
                                            Package handlers includes pre-configured handlers for bqloader.