bqstreamer

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2015 License: MIT Imports: 13 Imported by: 0

README

BigQuery Streamer GoDoc Build Status Coverage Status

Stream insert data into BigQuery fast and concurrently, using InsertAll().

Features

  • Inserts multiple rows in bulk.
  • Uses configurable multiple workers (i.e. goroutines) to queue and insert rows.
  • Production ready, and thoroughly tested. We - at Rounds - are using it in our data gathering workflow.
  • BigQuery errors are sent to a unified channel so you can read and decide how to handle them.

Getting Started

  1. Install Go, version should be at least 1.3. We recommend using gvm to manage your Go versions.
  2. Execute go get -t ./... to download all necessary packages.
  3. Acquire Google OAuth2/JWT credentials, so you can connect to BigQuery.
  4. Copy and run one of the examples: MultiStreamer and Streamer.

How Does It Work?

There are two types you can use: Streamer and MultiStreamer.

Streamer

A Streamer is a single worker which reads rows, queues them, and inserts them (also called flushing) in bulk into BigQuery once a certain threshold is reached. Thresholds can be either an amount of rows queued, or based on time - inserting once a certain time has passed.

This provides flush control, inserting in set sizes and quickly enough. Please note Google has quota policies on size and frequency of inserts.

In addition, the Streamer knows how to handle BigQuery server errors (HTTP 500 and the like), and attempts to retry insertions several times on such failures.

It also sends errors on an error channel, which can be read an handled.

MultiStreamer

A MultiStreamer operates multiple Streamers concurrently (i.e. workers). It reads rows and distributes them to the Streamers.

This allows insertion with a higher insert throughput, where numerous workers are queueing rows and inserting concurrenctly.

Like Streamer, errors are reported from each worker and sent to a unified error channel, where you can decide to read and handle them if necessary.

Contribute

Please check the issues page which might have some TODOs. Feel free to file new bugs and ask for improvements. We welcome pull requests!

Test
# Run unit tests, and check coverage.
$ go test -v -cover

# Run integration tests. This requires an active project, dataset and pem key.
# Make sure you edit the project, dataset, and table name in the .sh file.
$ ./integration_test.sh
$ ./multi_integration_test.sh

Documentation

Overview

Package bqstreamer implements a concurrent stream (bulk) inserter to Google BigQuery.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBigQueryService

func NewBigQueryService(c *jwt.Config) (service *bigquery.Service, err error)

NewBigQueryService returns a new BigQuery service (client), authenticated via OAuth2/JWT.

NOTE: This function authenticates with Google OAuth2 service, thus susceptible to network delays and blocks.

func NewJWTConfig

func NewJWTConfig(keyPath string) (c *jwt.Config, err error)

NewJWTConfig returns a new JWT configuration from a JSON key, acquired via https://console.developers.google.com.

A config is used to authenticate with Google OAuth2.

Types

type AllRowsRejectedError

type AllRowsRejectedError struct {
	// contains filtered or unexported fields
}

AllRowsRejectedError is returned when all rows in an insert have been rejected, meaning no insert retry will occur.

func (*AllRowsRejectedError) DatasetID

func (err *AllRowsRejectedError) DatasetID() string

func (*AllRowsRejectedError) Error

func (err *AllRowsRejectedError) Error() string

func (*AllRowsRejectedError) ProjectID

func (err *AllRowsRejectedError) ProjectID() string

func (*AllRowsRejectedError) TableID

func (err *AllRowsRejectedError) TableID() string

type MultiStreamer

type MultiStreamer struct {

	// Errors are reported to this channel.
	Errors chan error
	// contains filtered or unexported fields
}

A MultiStreamer operates multiple Streamers, also called workers, or sub-streamers. The MultiStreamer feeds rows to a single rowChannel, and all sub-streamers read from it together. This improves scalability by allowing a higher message throuhput.

TODO Improve by managing a channel of sub-streamers, notifying multi-streamer when they're ready to read rows, and then letting them read one-by-one (the first will read a chunk and go streaming, then the next one will read and stream, etc.) Right now they're all reading together simultaniously, reacing for messages. See here: http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html

Example

This example uses a MultiStreamer. A single row is queued, and will be flushed once a time threshold has passed, or if the MultiStreamer is explicitly closed.

Starting a MultiStreamer is a non-blocking operation (unlike Streamer), so there's no need to run it in its own goroutine.

You should probably use it instead of Streamer, as it provides better concurrency and speed.

// Init OAuth2/JWT. This is required for authenticating with BigQuery.
// https://cloud.google.com/bigquery/authorization
// https://developers.google.com/console/help/new/#generatingoauth2
jwtConfig, err := NewJWTConfig("path_to_key.json")
if err != nil {
	log.Fatalln(err)
}

// Set MultiStreamer configuration.
numStreamers := 10                  // Number of concurrent sub-streamers (workers) to use.
maxRows := 500                      // Amount of rows queued before forcing insert to BigQuery.
maxDelay := 1 * time.Second         // Time to pass between forcing insert to BigQuery.
sleepBeforeRetry := 1 * time.Second // Time to wait between failed insert retries.
maxRetryInsert := 10                // Maximum amount of failed insert retries before discarding rows and moving on.

// Init a new multi-streamer.
ms, err := NewMultiStreamer(
	jwtConfig, numStreamers, maxRows, maxDelay, sleepBeforeRetry, maxRetryInsert)

// Start multi-streamer and workers.
ms.Start()
defer ms.Stop()

// Worker errors are reported to MultiStreamer.Errors channel.
// This inits a goroutine the reads from this channel and logs errors.
//
// It can be closed by sending "true" to the shutdownErrorChan channel.
shutdownErrorChan := make(chan bool)
go func() {
	var err error

	readErrors := true
	for readErrors {
		select {
		case <-shutdownErrorChan:
			readErrors = false
		case err = <-ms.Errors:
			log.Println(err)
		}
	}
}()
defer func() { shutdownErrorChan <- true }()

// Queue a single row.
// Insert will happen once maxDelay time has passed,
// or maxRows rows have been queued.
ms.QueueRow(
	"project-id", "dataset-id", "table-id",
	map[string]bigquery.JsonValue{"key": "value"},
)
Output:

func NewMultiStreamer

func NewMultiStreamer(
	jwtConfig *jwt.Config,
	numStreamers int,
	maxRows int,
	maxDelay time.Duration,
	sleepBeforeRetry time.Duration,
	maxRetryInsert int) (*MultiStreamer, error)

NewMultiStreamer returns a new MultiStreamer.

func (*MultiStreamer) QueueRow

func (b *MultiStreamer) QueueRow(projectID, datasetID, tableID string, jsonRow map[string]bigquery.JsonValue)

QueueRow queues a single row, which will be read and inserted by one of the sub-streamers.

func (*MultiStreamer) Start

func (b *MultiStreamer) Start()

Start starts the sub-streamers, making them read from a common row channel, and output to the same error channel.

func (*MultiStreamer) Stop

func (b *MultiStreamer) Stop()

Stop stops all sub-streamers. Note all sub-streamers will flush to BigQuery before stopping.

type RowError

type RowError struct {
	// contains filtered or unexported fields
}

RowError is a specific row insert error, returned after inserting multiple rows.

func (*RowError) BQError

func (err *RowError) BQError() bigquery.ErrorProto

func (*RowError) DatasetID

func (err *RowError) DatasetID() string

func (*RowError) Error

func (err *RowError) Error() string

func (*RowError) Index

func (err *RowError) Index() int64

func (*RowError) JsonValue

func (err *RowError) JsonValue() map[string]bigquery.JsonValue

func (*RowError) ProjectID

func (err *RowError) ProjectID() string

func (*RowError) TableID

func (err *RowError) TableID() string

type Streamer

type Streamer struct {

	// Max delay between flushes to BigQuery.
	MaxDelay time.Duration `validate:"min=1"`

	// Sleep delay after a rejected insert and before retry.
	SleepBeforeRetry time.Duration `validate:"min=1"`

	// Maximum retry insert attempts for non-rejected row insert errors.
	// e.g. GoogleAPI HTTP errors, generic HTTP errors, etc.
	MaxRetryInsert int `validate:"min=0"`

	// Errors are reported to this channel.
	Errors chan error

	// Start read-queue-stream loop function.
	Start func()

	// Stop read-queue-stream loop function.
	Stop func()
	// contains filtered or unexported fields
}

A Streamer is a BigQuery stream inserter, queuing rows and stream inserts to BigQuery in bulk by calling InsertAll().

Example

This example uses a single Streamer. A single row is queued, and will be flushed once a time threshold has passed, or if the streamer is explicitly closed.

Note starting a Streamer is a blocking operation, so it needs to run in its own goroutine.

You should probably use MultiStreamer, as it provides better concurrency and speed, but Streamer is there if you need to.

// Init OAuth2/JWT. This is required for authenticating with BigQuery.
// See the following URLs for more info:
// https://cloud.google.com/bigquery/authorization
// https://developers.google.com/console/help/new/#generatingoauth2
jwtConfig, err := NewJWTConfig("path_to_key.json")
if err != nil {
	log.Fatalln(err)
}

// Init a BigQuery client.
service, err := NewBigQueryService(jwtConfig)
if err != nil {
	log.Fatalln(err)
}

// Set streamer configuration.
maxRows := 500                      // Amount of rows queued before forcing insert to BigQuery.
maxDelay := 1 * time.Second         // Time to pass between forcing insert to BigQuery.
sleepBeforeRetry := 1 * time.Second // Time to wait between failed insert retries.
maxRetryInsert := 10                // Maximum amount of failed insert retries before discarding rows and moving on.

// Init a new streamer.
s, err := NewStreamer(service, maxRows, maxDelay, sleepBeforeRetry, maxRetryInsert)
if err != nil {
	log.Fatalln(err)
}

// Start multi-streamer and workers.
// A Streamer (NOT a MultiStreamer) is blocking,
// so it needs to be start in its own goroutine.
go s.Start()
defer s.Stop()

// Queue a single row.
// Insert will happen once maxDelay time has passed,
// or maxRows rows have been queued.
s.QueueRow(
	"project-id", "dataset-id", "table-id",
	map[string]bigquery.JsonValue{"key": "value"},
)
Output:

func NewStreamer

func NewStreamer(
	service *bigquery.Service,
	maxRows int,
	maxDelay time.Duration,
	sleepBeforeRetry time.Duration,
	maxRetryInsert int) (b *Streamer, err error)

NewStreamer returns a new Streamer.

func (*Streamer) QueueRow

func (b *Streamer) QueueRow(projectID, datasetID, tableID string, jsonRow map[string]bigquery.JsonValue)

QueueRow sends a single row to the row channel, which will be queued and inserted in bulk with other queued rows.

type TooManyFailedInsertRetriesError

type TooManyFailedInsertRetriesError struct {
	// contains filtered or unexported fields
}

TooManyFailedInsertRetriesError is returned when an insert failed several time, and the streamer stops retrying it.

func (*TooManyFailedInsertRetriesError) DatasetID

func (err *TooManyFailedInsertRetriesError) DatasetID() string

func (*TooManyFailedInsertRetriesError) Error

func (*TooManyFailedInsertRetriesError) NumFailedRetries

func (err *TooManyFailedInsertRetriesError) NumFailedRetries() int

func (*TooManyFailedInsertRetriesError) ProjectID

func (err *TooManyFailedInsertRetriesError) ProjectID() string

func (*TooManyFailedInsertRetriesError) TableID

func (err *TooManyFailedInsertRetriesError) TableID() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL