bqstreamer

package module
v2.0.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2016 License: MIT Imports: 13 Imported by: 0

README

BigQuery Streamer BigQuery GoDoc Build Status Coverage Status

Stream insert data into BigQuery fast and concurrently, using InsertAll().

Features

  • Insert rows from multiple tables, datasets, and projects, and insert them bulk. No need to manage data structures and sort rows by tables - bqstreamer does it for you.
  • Multiple background workers (i.e. goroutines) to enqueue and insert rows.
  • Insert can be done in a blocking or in the background (asynchronously).
  • Perform insert operations in predefined set sizes, according to BigQuery's quota policy.
  • Handle and retry BigQuery server errors.
  • Backoff interval between failed insert operations.
  • Error reporting.
  • Production ready, and thoroughly tested. We - at Rounds - are using it in our data gathering workflow.
  • Thorough testing and documentation for great good!

Getting Started

  1. Install Go, version should be at least 1.5.
  2. Clone this repository and download dependencies:
  3. Version v2: go get gopkg.in/rounds/go-bqstreamer.v2
  4. Version v1: go get gopkg.in/rounds/go-bqstreamer.v1
  5. Acquire Google OAuth2/JWT credentials, so you can authenticate with BigQuery.

How Does It Work?

There are two types of inserters you can use:

  1. SyncWorker, which is a single blocking (synchronous) worker.
  2. It enqueues rows and performs insert operations in a blocking manner.
  3. AsyncWorkerGroup, which employes multiple background SyncWorkers.
  4. The AsyncWorkerGroup enqueues rows, and its background workers pull and insert in a fan-out model.
  5. An insert operation is executed according to row amount or time thresholds for each background worker.
  6. Errors are reported to an error channel for processing by the user.
  7. This provides a higher insert throughput for larger scale scenarios.

Examples

Check the GoDoc examples section.

Contribute

  1. Please check the issues page.
  2. File new bugs and ask for improvements.
  3. Pull requests welcome!
Test
# Run unit tests and check coverage.
$ make test

# Run integration tests.
# This requires an active project, dataset and pem key.
$ export BQSTREAMER_PROJECT=my-project
$ export BQSTREAMER_DATASET=my-dataset
$ export BQSTREAMER_TABLE=my-table
$ export BQSTREAMER_KEY=my-key.json
$ make testintegration

Documentation

Overview

Package bqstreamer implements synchronous and asynchronous stream-inserters for Google BigQuery.

https://cloud.google.com/bigquery/

Stream-insert is performed using InsertAll(): https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll

This packages provides two inserter types that can be used to insert rows and tables:

1. SyncWorker

  • A single blocking (synchronous) worker.
  • Enqueues rows and performs insert operations in a blocking manner.

2. AsyncWorkerGroup

  • Wraps multiple SyncWorkers.
  • Enqueues rows and performs insert operations in the background.
  • Background workers execute insert operations according to amount of enqueued rows or time thresholds.
  • Errors are reported to an error channel for processing by the user.
  • This provides a higher insert throughput for larger scale scenarios.

Index

Examples

Constants

View Source
const (
	DefaultAsyncNumWorkerss = 10
	DefaultAsyncMaxRows     = 500
	DefaultAsyncMaxDelay    = 5 * time.Second
)
View Source
const (
	// BigQuery has a quota policy regarding how big and often inserts should
	// be. See the following article for more info:
	//
	// https://cloud.google.com/bigquery/quota-policy#streaminginserts
	DefaultSyncMaxRetries    = 3
	DefaultSyncRetryInterval = 5 * time.Second
)

Variables

This section is empty.

Functions

func NewJWTConfig

func NewJWTConfig(keyPath string) (c *jwt.Config, err error)

NewJWTConfig returns a new JWT configuration from a JSON key, acquired via https://console.developers.google.com.

It returns a jwt.Config, used to authenticate with Google OAuth2.

Types

type AsyncOptionFunc

type AsyncOptionFunc func(*AsyncWorkerGroup) error

func SetAsyncErrorChannel

func SetAsyncErrorChannel(errChan chan *InsertErrors) AsyncOptionFunc

SetAsyncErrorChannel sets the asynchronous workers' error channel.

Use this option when you want all workers to report errors to a unified channel.

NOTE the error channel is not closed when the AsyncWorkerGroup closes. It is the responsibilty of the user to close it.

func SetAsyncIgnoreUnknownValues

func SetAsyncIgnoreUnknownValues(ignore bool) AsyncOptionFunc

SetAsyncIgnoreUnknownValues sets whether to accept rows that contain values that do not match the table schema. The unknown values are ignored.

Default is false, which treats unknown values as errors.

func SetAsyncMaxDelay

func SetAsyncMaxDelay(delay time.Duration) AsyncOptionFunc

SetAsyncMaxDelay sets the maximum time delay a worker should wait before an insert operation is executed.

NOTE value must be a positive time.Duration.

func SetAsyncMaxRetries

func SetAsyncMaxRetries(retries int) AsyncOptionFunc

SetAsyncMaxRetries sets the maximum amount of retries a failed insert operation can be retried, before dropping the rows and giving up on the insert operation entirely.

NOTE value must be a non-negative int.

func SetAsyncMaxRows

func SetAsyncMaxRows(rowLen int) AsyncOptionFunc

SetAsyncMaxRows sets the maximum amount of rows a worker can enqueue before an insert operation is executed.

NOTE this threshold is not per-table, but the entire amount of rows overall enqueued by a single worker.

NOTE value must be a non-negative int.

func SetAsyncNumWorkers

func SetAsyncNumWorkers(workers int) AsyncOptionFunc

SetAsyncNumWorkers sets the amount of background workers.

NOTE value must be a positive int.

func SetAsyncRetryInterval

func SetAsyncRetryInterval(sleep time.Duration) AsyncOptionFunc

SetAsyncRetryInterval sets the time delay before retrying a failed insert operation (if required).

NOTE value must be a positive time.Duration.

func SetAsyncSkipInvalidRows

func SetAsyncSkipInvalidRows(skip bool) AsyncOptionFunc

SetAsyncSkipInvalidRows sets whether to insert all valid rows of a request, even if invalid rows exist.

The default value is false, which causes the entire request to fail if any invalid rows exist.

type AsyncWorkerGroup

type AsyncWorkerGroup struct {
	// contains filtered or unexported fields
}

AsyncWorkerGroup asynchronously streams rows to BigQuery in bulk.

Example

This example initializes an AsyncWorkerGroup, sets up an error handling goroutine, and enqueues a single row.

An insert operation to BigQuery will be executed once either a maximum delay time has passed, maximum rows have been enqueued, or the AsyncWorkerGroup has been closed.

// Init a new AsyncWorkerGroup:

// Initialize an error channel,
// into which all AsyncWorkers will report their errors.
//
// NOTE this channel must be read from, otherwise the workers will block and hang.
errChan := make(chan *InsertErrors)

// Define a function for processing insert results.
// This function only logs insert errors.
done := make(chan struct{})
defer close(done)

// Error handling goroutine,
// which just fetches errors and throws them away.
go func() {
	for range errChan {
		select {
		case <-done:
			// Read all remaining errors (if any are left)
			// and return.
			for range errChan {
			}
			return
		case <-errChan:
		}
	}
}()

jwtConfig, err := NewJWTConfig("path_to_key.json")
if err != nil {
	log.Fatalln(err)
}

// Initialize a worker group.
g, err := NewAsyncWorkerGroup(
	jwtConfig,
	SetAsyncNumWorkers(10),               // Number of background workers in the group.
	SetAsyncMaxRows(500),                 // Amount of rows that must be enqueued before executing an insert operation to BigQuery.
	SetAsyncMaxDelay(1*time.Second),      // Time to wait between inserts.
	SetAsyncRetryInterval(1*time.Second), // Time to wait between failed insert retries.
	SetAsyncMaxRetries(10),               // Maximum amount of retries a failed insert is allowed to be retried.
	SetAsyncIgnoreUnknownValues(true),    // Ignore unknown fields when inserting rows.
	SetAsyncSkipInvalidRows(true),        // Skip bad rows when inserting.
	SetAsyncErrorChannel(errChan),        // Set unified error channel.
)

if err != nil {
	log.Fatalln(err)
}

// Start AsyncWorkerGroup.
// Start() starts the background workers and returns immediately.
g.Start()

// Close() blocks until all workers have inserted any remaining rows to
// BigQuery and closed.
defer g.Close()

// Enqueue a single row.
//
// An insert operation will be executed once the time delay defined by
// SetAsyncMaxDelay is reached,
// or enough rows have been queued (not shown in this example).
g.Enqueue(
	NewRow(
		"my-project",
		"my-dataset",
		"my-table",
		map[string]bigquery.JsonValue{"key": "value"},
	))
Output:

func NewAsyncWorkerGroup

func NewAsyncWorkerGroup(jwtConfig *jwt.Config, options ...AsyncOptionFunc) (*AsyncWorkerGroup, error)

New returns a new AsyncWorkerGroup using given OAuth2/JWT configuration.

func (*AsyncWorkerGroup) Close

func (s *AsyncWorkerGroup) Close()

Close inserts any remaining rows enqueue by all workers, then closes them.

NOTE that the AsyncWorkerGroup cannot be restarted. If you wish to perform any additional inserts to BigQuery, a new one must be initialized.

func (*AsyncWorkerGroup) Enqueue

func (s *AsyncWorkerGroup) Enqueue(row Row)

func (*AsyncWorkerGroup) Start

func (s *AsyncWorkerGroup) Start()

Start starts all background workers.

Workers read enqueued rows, and insert them to BigQuery until one of the following happens:

  • Enough time has passed according to configuration.
  • Amount of rows has been enqueued by a worker, also configurable.

Insert errors will be reported to the error channel if set.

type InsertErrors

type InsertErrors struct {
	Tables []*TableInsertErrors
}

InsertErrors is returned from an insert attempt. It provides functions to iterate over errors relating to an insert operation.

BigQuery has a complex error hierarchy:

Insert --> Tables --> Table Insert Attempt --> Rows --> Row Error

During an insert operation, multiple tables are inserted in bulk into BigQuery using a separate request for each table. Each table-specific insert operation is comprised from multiple insert attempts (requests). Every table insert can be retried until an attempt is successful or too many attempts have failed. Failures can occur for various reasons e.g. server errors, malformed payload, etc.

This interface allows to iterate over all tables, attempts, rows, and row errors associated with an insert operation.

Example

This example demonstrates how to handle insert errors, returned from an insert operation.

jwtConfig, err := NewJWTConfig("path_to_key.json")
if err != nil {
	log.Fatalln(err)
}

w, err := NewSyncWorker(jwtConfig.Client(oauth2.NoContext))
if err != nil {
	log.Fatalln(err)
}

// Enqueue rows for multiple tables.
w.Enqueue(NewRow("my-project", "my-dataset", "my-table-1", map[string]bigquery.JsonValue{"key-1": "value-1"}))
w.Enqueue(NewRow("my-project", "my-dataset", "my-table-2", map[string]bigquery.JsonValue{"key-2": "value-2"}))

// Perform an insert operation
insertErrs := w.Insert()

// Go over all tables' insert attempts, and log all errors.
for _, table := range insertErrs.All() {
	for _, attempt := range table.Attempts() {
		// Log insert attempt error.
		if err := attempt.Error(); err != nil {
			log.Printf("%s.%s.%s: %s\n", attempt.Project, attempt.Dataset, attempt.Table, err)
		}

		// Iterate over all rows in attempt.
		for _, row := range attempt.All() {
			// Iterate over all errors in row and log.
			for _, err := range row.All() {
				log.Printf("%s.%s.%s[%s]: %s\n", attempt.Project, attempt.Dataset, attempt.Table, row.InsertID, err)
			}
		}
	}
}
Output:

func (*InsertErrors) All

func (insert *InsertErrors) All() []*TableInsertErrors

All returns all remaining tables (those that have not been iterated over using Next()).

Calling Next() or All() again afterwards will yield a failed (empty) result.

func (*InsertErrors) Next

func (insert *InsertErrors) Next() (*TableInsertErrors, bool)

Next iterates over all inserted tables once, returning a single TableInsertErrors every call. Calling Next() multiple times will consequently return more tables, until all have been returned.

The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.

type Row

type Row struct {
	ProjectID,
	DatasetID,
	TableID string
	Data map[string]bigquery.JsonValue

	// Used for deduplication:
	// https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency
	InsertID string
}

Row associates a single BigQuery table row to a project, dataset and table.

func NewRow

func NewRow(projectID, datasetID, tableID string, data map[string]bigquery.JsonValue) Row

NewRow returns a new Row instance, with an automatically generated insert ID used for deduplication purposes.

func NewRowWithID

func NewRowWithID(projectID, datasetID, tableID, insertID string, data map[string]bigquery.JsonValue) Row

NewRowWithID returns a new Row instance with given insert ID.

type RowErrors

type RowErrors struct {
	// A table insert operation can be split into multiple requests
	// if too many rows have been queued. This means that rows
	// containing errors cannot be identified by their table index.
	// Therefore, each row can be identified by its insert ID instead.
	InsertID string
	// contains filtered or unexported fields
}

RowErrors contains errors relating to a single row. Each row can have multiple errors associated with it.

func (*RowErrors) All

func (row *RowErrors) All() []*bigquery.ErrorProto

All returns all remaining row errors (those that have not been iterated over using Next()).

Calling Next() or All() again afterwards will yield a failed (empty) result.

func (*RowErrors) Next

func (row *RowErrors) Next() (*bigquery.ErrorProto, bool)

Next iterates over all row errors once, returning a single row error every call. Calling Next() multiple times will consequently return more row errors, until all row errors have been returned.

The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.

type SyncOptionFunc

type SyncOptionFunc func(*SyncWorker) error

func SetSyncIgnoreUnknownValues

func SetSyncIgnoreUnknownValues(ignore bool) SyncOptionFunc

SetSyncIgnoreUnknownValues sets whether to accept rows that contain values that do not match the table schema. The unknown values are ignored. Default is false, which treats unknown values as errors.

func SetSyncMaxRetries

func SetSyncMaxRetries(retries int) SyncOptionFunc

SetSyncMaxRetries sets the maximum amount of retries a failed insert operation is allowed to retry, before dropping the rows and giving up on the insert operation entirely.

NOTE value must be a non-negative int.

func SetSyncRetryInterval

func SetSyncRetryInterval(sleep time.Duration) SyncOptionFunc

SetSyncRetryInterval sets the time delay before retrying a failed insert operation (if required).

NOTE value must be a positive time.Duration.

func SetSyncSkipInvalidRows

func SetSyncSkipInvalidRows(skip bool) SyncOptionFunc

SetSyncSkipInvalidRows sets whether to insert all valid rows of a request, even if invalid rows exist. The default value is false, which causes the entire request to fail if any invalid rows exist.

type SyncWorker

type SyncWorker struct {
	// contains filtered or unexported fields
}

SyncWorker streams rows to BigQuery in bulk using synchronous calls.

Example

This example initializes a single SyncWorker, enqueues a single row, and execute an insert operation which inserts this row into its associated table in BigQuery.

// Init OAuth2/JWT. This is required for authenticating with BigQuery.
// See the following URLs for more info:
// https://cloud.google.com/bigquery/authorization
// https://developers.google.com/console/help/new/#generatingoauth2
jwtConfig, err := NewJWTConfig("path_to_key.json")
if err != nil {
	log.Fatalln(err)
}

// Init a new SyncWorker.
w, err := NewSyncWorker(
	jwtConfig.Client(oauth2.NoContext),  // http.Client authenticated via OAuth2.
	SetSyncRetryInterval(1*time.Second), // Time to wait between failed insert retries.
	SetSyncMaxRetries(10),               // Maximum amount of retries a failed insert is allowed to be retried.
	SetSyncIgnoreUnknownValues(true),    // Ignore unknown fields when inserting rows.
	SetSyncSkipInvalidRows(true),        // Skip bad rows when inserting.
)

if err != nil {
	log.Fatalln(err)
}

// Enqueue a single row.
w.Enqueue(
	NewRow(
		"my-project",
		"my-dataset",
		"my-table",
		map[string]bigquery.JsonValue{"key": "value"},
	))

// Alternatively, you can supply your own unique identifier to the row,
// for de-duplication purposes.
//
// See the following article for more info:
// https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency
//
// w.Enqueue(
// 	NewRowWithID(
// 		"my-project",
// 		"my-dataset",
// 		"my-table",
// 		"my-unique-row-id",
// 		map[string]bigquery.JsonValue{"key": "value"},
// 	))

// Execute an insert operation.
//
// NOTE this function returns insert errors,
// demonstrated in another example.
w.Insert()

// Alternatively, you can use InsertWithRetry,
// which will retry failed inserts in case of BigQuery server errors.
//
// insertErrs := w.InsertWithRetry()
Output:

func NewSyncWorker

func NewSyncWorker(client *http.Client, options ...SyncOptionFunc) (*SyncWorker, error)

NewSyncWorker returns a new SyncWorker.

func (*SyncWorker) Enqueue

func (w *SyncWorker) Enqueue(row Row)

Enqueue enqueues rows for insert in bulk.

func (*SyncWorker) Insert

func (w *SyncWorker) Insert() *InsertErrors

Insert executes an insert operation in bulk. It sorts rows by tables, and inserts them using separate insert requests. It also splits rows for the same table if too many rows have been queued, according to BigQuery quota policy.

The insert blocks until a response is returned. The response contains insert and row errors for the inserted tables.

func (*SyncWorker) InsertWithRetry

func (w *SyncWorker) InsertWithRetry() *InsertErrors

InsertWithRetry is similar to Insert(), but retries an insert operation multiple times on BigQuery server errors.

See the following article for more info: https://cloud.google.com/bigquery/troubleshooting-errors

func (*SyncWorker) RowLen

func (w *SyncWorker) RowLen() int

RowLen returns the number of enqueued rows in the worker, which haven't been inserted into BigQuery yet.

type TableInsertAttemptErrors

type TableInsertAttemptErrors struct {

	// The table name associated with the insert attempt.
	Table string

	// The  dataset name associated with the insert attempt.
	Dataset string

	// The project associated with the insert attempt.
	Project string
	// contains filtered or unexported fields
}

TableInsertAttemptErrors contains errors relating to a single table insert attempt.

It implements the error interface.

func (*TableInsertAttemptErrors) All

func (table *TableInsertAttemptErrors) All() []*RowErrors

All returns all remaining row errors (those that have not been interated over using Next()).

Calling Next() or All() again afterwards will yield a failed (empty) result.

func (*TableInsertAttemptErrors) Error

func (table *TableInsertAttemptErrors) Error() error

Error returns a non-nil value when the table's insert attempt has failed completely.

NOTE that an attempt can have no error, but still not insert the rows. This can happen for example if the request includes malformed rows with SkipInvalidRows set to false.

func (*TableInsertAttemptErrors) Next

func (table *TableInsertAttemptErrors) Next() (*RowErrors, bool)

Next iterates over the attempt's rows once, returning a single row every call. Calling Next() multiple times will consequently return more rows, until all have been returned.

The function returns true if a non-nil value was fetched. Once the iterator has been exhausted, (nil, false) will be returned on every subsequent call.

type TableInsertErrors

type TableInsertErrors struct {
	InsertAttempts []*TableInsertAttemptErrors
}

TableInsertErrors contains errors relating to a specific table from a bulk insert operation.

func (*TableInsertErrors) Attempts

func (table *TableInsertErrors) Attempts() []*TableInsertAttemptErrors

Attempts returns all insert attempts for a single table, in the order they were executed. All but the last attempts in the returned slice have failed. Only the last one might have succeeded, or failed as well, as indicated by Error().

type TooManyFailedInsertRetriesError

type TooManyFailedInsertRetriesError struct {
	// Number of failed retries.
	NumFailedRetries int

	// The table name associated with the insert attempt.
	Table string

	// The  dataset name associated with the insert attempt.
	Dataset string

	// The project associated with the insert attempt.
	Project string
}

TooManyFailedInsertAttemptsError is returned when a specific insert attempt has been retried and failed multiple times, causing the worker to stop retrying and drop that table's insert operation entirely.

It implements the error interface.

func (*TooManyFailedInsertRetriesError) Error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL