bgc

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2024 License: Apache-2.0 Imports: 26 Imported by: 4

README

Datastore Connectivity for BigQuery (bgc)

Datastore Connectivity library for BigQuery in Go. GoDoc

This library is compatible with Go 1.5+

Please refer to CHANGELOG.md if you encounter breaking changes.

This library uses SQL mode and streaming API to insert data as default. To use legacy SQL please use the following /* USE LEGACY SQL */ hint, in this case you will not be able to fetch repeated and nested fields.

Configuration parameters
insertMethod

To control insert method just provide config.parameters with the following value:

_table_name_.insertMethod = "load"

Note that if streaming is used, currently UPDATE and DELETE statements are not supported.

insertIdColumn

For streaming you can specify which column to use as insertId with the following config.params

_table_name_.insertMethod = "stream"
_table_name_.insertIdColumn = "sessionId"
streamBatchCount

streamBatchCount controls row count in batch (default 9999)

insertWaitTimeoutInMs

When inserting data data this library checks upto 60 sec if data has been added. To control this behaviour you can set insertWaitTimeoutInMs (default 60 sec)

To disable this mechanism set: insertWaitTimeoutInMs: -1

insertMaxRetires

Retries insert when 503 internal error

datasetId

Default dataset

pageSize

Default 500

The maximum number of rows of data to return per page of results. In addition to this limit, responses are also limited to 10 MB.

Credentials

  1. Google secrets for service account

a) set GOOGLE_APPLICATION_CREDENTIALS environment variable

b) credential can be a name with extension of the JSON secret file placed into ~/.secret/ folder

config.yaml

driverName: bigquery
credentials: bq # place your big query secret json to ~/.secret/bg.json
parameters:
  datasetId: myDataset

c) full URL to secret file

config.yaml

driverName: bigquery
credentials: file://tmp/secret/mySecret.json
parameters:
  datasetId: myDataset

Secret file has to specify the following attributes:

type Config struct {
	//google cloud credential
	ClientEmail  string `json:"client_email,omitempty"`
	TokenURL     string `json:"token_uri,omitempty"`
	PrivateKey   string `json:"private_key,omitempty"`
	PrivateKeyID string `json:"private_key_id,omitempty"`
	ProjectID  string `json:"project_id,omitempty"`
}
  1. Private key (pem)

config.yaml

driverName: bigquery
credentials: bq # place your big query secret json to ~/.secret/bg.json
parameters:
  serviceAccountId: "***@developer.gserviceaccount.com"
  datasetId: MyDataset
  projectId: spheric-arcadia-98015
  privateKeyPath: /tmp/secret/bq.pem

Usage:

The following is a very simple example of Reading and Inserting data


package main

import (
    "github.com/viant/bgc"
    "github.com/viant/dsc"
    "time"
    "fmt"
    "log"
)


type MostLikedCity struct {
	City      string
	Visits    int
	Souvenirs []string
}

type  Traveler struct {
	Id            int
	Name          string
	LastVisitTime time.Time
	Achievements  []string
	MostLikedCity MostLikedCity
	VisitedCities []struct {
		City   string
		Visits int
	}
}


func main() {

    config, err := dsc.NewConfigWithParameters("bigquery", "",
    	    "bq", // google cloud secret placed in ~/.secret/bg.json
            map[string]string{
                "datasetId":"MyDataset",
            })

    if err != nil {
        log.Fatal(err)
    }

		
    factory := dsc.NewManagerFactory()
    manager, err := factory.Create(config)
    if err != nil {
        log.Fatalf("Failed to create manager %v", err)
    }
   

    traveler := Traveler{}
    success, err := manager.ReadSingle(&traveler, " SELECT id, name, lastVisitTime, visitedCities, achievements, mostLikedCity FROM travelers WHERE id = ?", []interface{}{4}, nil)
    if err != nil {
        panic(err.Error())
    }

    travelers :=  make([]Traveler, 0)
    err:= manager.ReadAll(&interest, "SELECT iid, name, lastVisitTime, visitedCities, achievements, mostLikedCity",nil, nil)
    if err != nil {
        panic(err.Error())
    }

   // ...

    inserted, updated, err := manager.PersistAll(&travelers, "travelers", nil)
    if err != nil {
           panic(err.Error())
    }
    // ...
    




   //Custom reading handler with reading query info type to get CacheHit, TotalRows, TotalBytesProcessed

   var resultInfo = &bgc.QueryResultInfo{}
   var perf = make(map[string]int)  
   	err = manager.ReadAllWithHandler(`SELECT DATE(date), COUNT(*) FROM performance_agg WHERE DATE(date) = ?  GROUP BY 1`, []interface{}{
   		"2018-05-03",
   		resultInfo,
   	}, func(scanner dsc.Scanner) (toContinue bool, err error) {
   	        var date string
   	        var count int
   	        err = scanner.Scan(&date, &count)
   	        if err != nil {
   	        	return false, err
   	        }
   	        perf[date] = count
   		return true, nil
   	})
   	log.Printf("cache: %v,  rows: %v, bytes: %v", resultInfo.CacheHit, resultInfo.TotalRows, resultInfo.TotalBytesProcessed)

   
    dialect := dsc.GetDatastoreDialect(config.DriverName)
    DDL, err := dialect.ShowCreateTable(manager, "performance_agg")
    fmt.Printf("%v %v\n", DDL, err)
   
}

GoCover

GoCover

License

The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE.

Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.

Credits and Acknowledgements

Library Author: Adrian Witas

Contributors: Mikhail Berlyant

Documentation

Index

Constants

View Source
const (
	ServiceAccountIdKey = "serviceAccountId"
	PrivateKey          = "privateKey"
	PrivateKeyPathKey   = "privateKeyPath"
	ProjectIDKey        = "projectId"
	DataSetIDKey        = "datasetId"
	DateFormatKey       = "dateFormat"
	MaxResultsKey       = "maxResults"
)
View Source
const (
	InsertMethodStream       = "stream"
	InsertMethodLoad         = "load"
	InsertWaitTimeoutInMsKey = "insertWaitTimeoutInMs"
	InsertIdColumn           = "insertIdColumn"
	StreamBatchCount         = "streamBatchCount" //can not be more than 10000

	InsertMaxRetires = "insertMaxRetires"
)

Variables

This section is empty.

Functions

func GetServiceAndContextForManager

func GetServiceAndContextForManager(manager dsc.Manager) (*bigquery.Service, context.Context, error)

GetServiceAndContextForManager returns big query service and context for passed in datastore manager.

Types

type ColumnInfo added in v0.2.1

type ColumnInfo struct {
	Name            string
	DataType        string
	IsNullable      bool
	IsPartitioned   bool
	ClusterPosition int
}

type Compressed added in v0.2.0

type Compressed struct {
	// contains filtered or unexported fields
}

Compressed represent compressed encoded payload

func NewCompressed added in v0.2.0

func NewCompressed(encoderFactory toolbox.EncoderFactory) *Compressed

NewCompressed return new compressed struct

func (*Compressed) Append added in v0.2.0

func (c *Compressed) Append(data map[string]interface{}) error

Append append data to compressing stream

func (*Compressed) GetAndClose added in v0.2.0

func (c *Compressed) GetAndClose() (io.Reader, error)

GetAndClose returns reader and cloases the stream

type InsertTask

type InsertTask struct {
	// contains filtered or unexported fields
}

InsertTask represents insert streaming task.

func NewInsertTask

func NewInsertTask(manager dsc.Manager, table *dsc.TableDescriptor, waitForCompletion bool) (*InsertTask, error)

NewInsertTask creates a new streaming insert task, it takes manager, table descript with schema, waitForCompletion flag with time duration.

func (*InsertTask) Insert added in v0.2.0

func (it *InsertTask) Insert(reader io.Reader) error

InsertAll streams all records into big query, returns number records streamed or error.

func (*InsertTask) InsertAll

func (it *InsertTask) InsertAll(data interface{}) (int, error)

InsertAll streams or load all records into big query, returns number records streamed or error.

func (*InsertTask) InsertSingle

func (it *InsertTask) InsertSingle(record map[string]interface{}) error

InsertSingle streams single records into big query.

func (*InsertTask) LoadAll added in v0.2.0

func (it *InsertTask) LoadAll(data interface{}) (int, error)

InsertAll streams all records into big query, returns number records streamed or error.

func (*InsertTask) StreamAll added in v0.2.0

func (it *InsertTask) StreamAll(data interface{}) (int, error)

InsertAll streams all records into big query, returns number records streamed or error.

type QueryIterator

type QueryIterator struct {
	Rows []*bigquery.TableRow
	// contains filtered or unexported fields
}

QueryIterator represetns a QueryIterator.

func NewQueryIterator

func NewQueryIterator(manager dsc.Manager, query string) (*QueryIterator, error)

NewQueryIterator creates a new query iterator for passed in datastore manager and query.

func (*QueryIterator) GetColumnTypes added in v0.8.0

func (qi *QueryIterator) GetColumnTypes() ([]dsc.ColumnType, error)

GetColumns returns query columns, after query executed.

func (*QueryIterator) GetColumns

func (qi *QueryIterator) GetColumns() ([]string, error)

GetColumns returns query columns, after query executed.

func (*QueryIterator) HasNext

func (qi *QueryIterator) HasNext() bool

HasNext returns true if there is next row to fetch.

func (*QueryIterator) Next

func (qi *QueryIterator) Next() ([]interface{}, error)

Next returns next row.

type QueryResultInfo added in v0.2.0

type QueryResultInfo struct {
	CacheHit            bool
	TotalRows           int
	TotalBytesProcessed int
}

Represents query result into

func (*QueryResultInfo) Set added in v0.7.0

func (i *QueryResultInfo) Set(info *QueryResultInfo)

Set sets info values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL