README

bqloader

PkgGoDev Main Branch Workflow Go Report Card codecov GitHub GitHub tag (latest SemVer)

bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.

Installation

go get -u go.nownabe.dev/bqloader

Getting Started

(See Quickstart example to get a full instruction.)

For simple transforming and loading CSV, import the package go.nownabe.dev/bqloader and write your handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	var err error
	loader, err = bqloader.New()
	if err != nil {
		panic(err)
	}
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	/*
		This projector converts date fields formatted as "2006/01/02"
		at the first column into strings like "2006-01-02" that satisfies
		BigQuery date type.
	*/
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}
Expand ▾ Collapse ▴

Documentation

Overview

Package bqloader is a simple ETL framework running on Cloud Functions to load data from Cloud Storage into BigQuery.

Getting started

See Quickstart example to get a full instruction. https://github.com/nownabe/go-bqloader/tree/main/examples/quickstart

For simple transforming and loading CSV, import the package `go.nownabe.dev/bqloader` and write your handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	var err error
	loader, err = bqloader.New()
	if err != nil {
		panic(err)
	}
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	// This projector converts date fields formatted as "2006/01/02"
	// at the first column into strings like "2006-01-02" that satisfies
	// BigQuery date type.
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BQLoader

type BQLoader interface {
	AddHandler(context.Context, *Handler) error
	Handle(context.Context, Event) error
	MustAddHandler(context.Context, *Handler)
}

BQLoader loads data from Cloud Storage to BigQuery table.

func New

func New(opts ...Option) (BQLoader, error)

New build a new Loader.

type Event

type Event struct {
	Name        string    `json:"name"`
	Bucket      string    `json:"bucket"`
	TimeCreated time.Time `json:"timeCreated"`

	// contains filtered or unexported fields

}

Event is an event from Cloud Storage.

func (*Event) FullPath

func (e *Event) FullPath() string

FullPath returns full path of storage object beginning with gs://.

type Handler

type Handler struct {
	// Name is the handler's name.
	Name string

	Pattern         *regexp.Regexp
	Encoding        encoding.Encoding
	Parser          Parser
	Notifier        Notifier
	Projector       Projector
	SkipLeadingRows uint
	Preprocessor    Preprocessor

	// BatchSize specifies how much records are processed in a groutine.
	// Default is 10000.
	BatchSize int

	// Project specifies GCP project name of destination BigQuery table.
	Project string

	// Dataset specifies BigQuery dataset ID of destination table
	Dataset string

	// Table specifies BigQuery table ID as destination.
	Table string

	// contains filtered or unexported fields

}

Handler defines how to handle events which match specified pattern.

type Notifier

type Notifier interface {
	Notify(context.Context, *Result) error
}

Notifier notifies results for each event.

type Option

type Option interface {
	// contains filtered or unexported methods

}

Option configures BQLoader.

func WithConcurrency

func WithConcurrency(n int) Option

WithConcurrency configures the concurrency of projectors. Before setting this, confirm GOMAXPROCS.

func WithLogLevel

func WithLogLevel(l string) Option

WithLogLevel configures log level to print logs. Allowed values are trace, debug, info, warn, error, fatal or panic.

func WithPrettyLogging

func WithPrettyLogging() Option

WithPrettyLogging configures BQLoader to print human friendly logs.

type Parser

type Parser func(context.Context, io.Reader) ([][]string, error)

Parser parses files from storage.

func CSVParser

func CSVParser() Parser

CSVParser provides a parser to parse CSV files.

type Preprocessor

type Preprocessor func(context.Context, Event) (context.Context, error)

Preprocessor preprocesses event and store data into a map.

type Projector

type Projector func(context.Context, []string) ([]string, error)

Projector transforms source records into records for destination.

type Result

type Result struct {
	Event   Event
	Handler *Handler
	Error   error
}

Result is a result for each event.

type SlackNotifier

type SlackNotifier struct {
	Channel string
	Token   string

	// Optional.
	IconEmoji string

	// Optional.
	Username string

	// Optional.
	HTTPClient *http.Client

	// contains filtered or unexported fields

}

SlackNotifier is a notifier for Slack. SlackNotifier requires bot token and permissions. Recommended permissions are chat:write, chat:write.customize and chat:write.public.

func (*SlackNotifier) Notify

func (n *SlackNotifier) Notify(ctx context.Context, r *Result) error

Notify notifies results to Slack channel.