bqworker

package module
v0.10.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 12 Imported by: 0

README

go-bqworker

license

go-esworker is an async worker that data can bulk insert, update to the BigQuery.
It's a library for golang.

Getting Started

Install
$ go get -u github.com/gjbae1212/go-bqworker
Usage
// pseudo code
package main
import (
    "context"
    "github.com/gjbae1212/go-bqworker"
)

func main() {
   schema := []*bqworker.TableSchema{your-tables-schema}
   cfg := bqworker.NewConfig("gcp proejct id", "gcp JWT bytes", schema , queue-size, worker-size, worker-queue, delay-time)
   
   streamer, _ := bqworker.NewStreamer(cfg, func(err error){ })
   
   ctx := context.Background()
   
   // row is an object implemented bqworker.Row. 
    
   // async
   streamer.AddRow(ctx, row)     
   // sync 
   streamer.AddRowSync(ctx, row)
}  

LICENSE

This project is following The MIT.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidParams = errors.New("[err] invalid params")
	ErrTimeout       = errors.New("[err] timeout")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(projectId string, jwtbys []byte, schemas []*TableSchema, queueSize, workerSize, workerStack int, workerDelay time.Duration) (*Config, error)

NewConfig returns a config object which injects dependency to bigquery streamer.

type ErrorHandler

type ErrorHandler func(error)

type Job

type Job struct {
	Msg *Message
}

message

type Message

type Message struct {
	DatasetId string
	TableId   string
	Data      Row
}

type Row

type Row interface {
	Save() (row map[string]bigquery.Value, insertID string, err error)
	Schema() (schema *TableSchema, err error)
	PublishedAt() time.Time
	InsertId() string
}

type Streamer

type Streamer interface {
	AddRow(ctx context.Context, row Row) error
	AddRowSync(ctx context.Context, row Row) error
}

func NewStreamer

func NewStreamer(cfg *Config, errFunc ErrorHandler) (Streamer, error)

NewStreamer returns bigquery streamer which inserts data with bulk parallel.

type TablePeriod

type TablePeriod int
const (
	NotExist TablePeriod = iota
	Daily
	Monthly
	Yearly
)

type TableSchema

type TableSchema struct {
	DatasetId     string                  // bigquery datasetId
	Prefix        string                  // bigquery table prefix
	Meta          *bigquery.TableMetadata // bigquery table meta
	Period        TablePeriod             // TablePeriod
	AddNewColumns bool                    // add new columns to existing table
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

type WorkerDispatcher

type WorkerDispatcher struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL