bqworker

package module
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2022 License: MIT Imports: 18 Imported by: 0

README

go-bqworker

license

go-esworker is an async worker that data can bulk insert, update to the BigQuery.
It's a library for golang.

Getting Started

Install
$ go get -u github.com/gjbae1212/go-bqworker
Usage
// pseudo code
package main
import (
    "context"
    "github.com/gjbae1212/go-bqworker"
)

func main() {
   schema := []*bqworker.TableSchema{your-tables-schema}
   cfg := bqworker.NewConfig("gcp proejct id", "gcp JWT bytes", schema , queue-size, worker-size, worker-queue, delay-time)
   
   streamer, _ := bqworker.NewStreamer(cfg, func(err error){ })
   
   ctx := context.Background()
   
   // row is an object implemented bqworker.Row. 
    
   // async
   streamer.AddRow(ctx, row)     
   // sync 
   streamer.AddRowSync(ctx, row)
}  

LICENSE

This project is following The MIT.

Documentation

Index

Constants

View Source
const (
	ChecksumFieldName = "_checksum"
)

Variables

View Source
var (
	ErrInvalidParams               = errors.New("[err] invalid params")
	ErrTimeout                     = errors.New("[err] timeout")
	ErrNotFoundBigQueryClient      = errors.New("[err] not found bigquery client")
	ErrNotFoundBigQueryTableSchema = errors.New("[err] not found bigquery table schema")
)

Functions

This section is empty.

Types

type ErrorHandler

type ErrorHandler func(error)

type Option added in v0.5.0

type Option interface {
	// contains filtered or unexported methods
}

Option is an interface for dependency injection.

type OptionFunc added in v0.5.0

type OptionFunc func(cfg *config)

func WithDispatcherLoopWaitDuration added in v0.5.0

func WithDispatcherLoopWaitDuration(duration time.Duration) OptionFunc

WithDispatcherLoopWaitDuration returns a function which injects worker wait duration to streamer.

func WithErrorHandler added in v0.5.0

func WithErrorHandler(f ErrorHandler) OptionFunc

WithErrorHandler returns a function which injects error handler to streamer.

func WithMaxRetry added in v0.9.2

func WithMaxRetry(retry int) OptionFunc

WithMaxRetry returns a function which injects max retry to streamer.

func WithQueueSize added in v0.5.0

func WithQueueSize(size int) OptionFunc

WithQueueSize returns a function which injects queue size to streamer.

func WithWorkerSize added in v0.5.0

func WithWorkerSize(size int) OptionFunc

WithWorkerSize returns a function which injects worker size to streamer.

func WithWorkerStack added in v0.5.0

func WithWorkerStack(stack int) OptionFunc

WithWorkerStack returns a function which injects worker stack to streamer.

func WithWorkerWaitDuration added in v0.5.0

func WithWorkerWaitDuration(duration time.Duration) OptionFunc

WithWorkerWaitDuration returns a function which injects worker wait duration to streamer.

type Row

type Row interface {
	bigquery.ValueSaver

	ProjectId() string      // GCP ProjectId
	Schema() *TableSchema   // BigQuery Table Schema
	PublishedAt() time.Time // PublishedAt is time, such as sends row to Bigquery.
	InsertId() string       // InsertId is unique id in BigQuery table.
}

type Streamer

type Streamer interface {
	AddRow(ctx context.Context, row Row) error     // Add bigquery row to dispatcher
	AddRowSync(ctx context.Context, row Row) error // Add bigquery row to dispatcher sync.
	Start() error                                  // Start Streamer
	Stop() error                                   // Stop Streamer
}

func NewStreamer

func NewStreamer(tableSchemaGroup []*TableSchemaGroup, opts ...Option) (Streamer, error)

NewStreamer returns bigquery streamer which inserts data with bulk parallel.

type TablePeriod

type TablePeriod int
const (
	NotExist TablePeriod = iota
	Daily
	Monthly
	Yearly
)

type TableSchema

type TableSchema struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTableSchema added in v0.5.0

func NewTableSchema(projectId string, datasetId, prefix string,
	meta *bigquery.TableMetadata, period TablePeriod, checksumField bool) (*TableSchema, error)

NewTableSchema creates table schema using bigquery.

func (*TableSchema) ChecksumField added in v0.9.0

func (ts *TableSchema) ChecksumField() bool

func (*TableSchema) DatasetId

func (ts *TableSchema) DatasetId() string

func (*TableSchema) Meta

func (ts *TableSchema) Meta() *bigquery.TableMetadata

func (*TableSchema) Period

func (ts *TableSchema) Period() TablePeriod

func (*TableSchema) Prefix

func (ts *TableSchema) Prefix() string

func (*TableSchema) ProjectId added in v0.5.0

func (ts *TableSchema) ProjectId() string

type TableSchemaGroup added in v0.5.0

type TableSchemaGroup struct {
	// contains filtered or unexported fields
}

func NewTableSchemaGroup added in v0.6.0

func NewTableSchemaGroup(projectId, credential string, tableSchemaList []*TableSchema) (*TableSchemaGroup, error)

NewTableSchemaGroup creates table schema group using bigquery.

func (*TableSchemaGroup) Credential added in v0.6.0

func (tsg *TableSchemaGroup) Credential() string

func (*TableSchemaGroup) ProjectId added in v0.5.0

func (tsg *TableSchemaGroup) ProjectId() string

func (*TableSchemaGroup) TableSchemaList added in v0.7.0

func (tsg *TableSchemaGroup) TableSchemaList() []*TableSchema

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL