proton

package module
v2.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2025 License: Apache-2.0 Imports: 28 Imported by: 5

README

Proton Go Driver

Introduction

Proton is a unified streaming and historical data processing engine in a single binary. The historical store is built based on ClickHouse.

This project provides go driver to interact with Proton, the code is based on https://github.com/ClickHouse/clickhouse-go.

Installation

To get started, you need to have Go installed. Then, import the Proton Database Go Driver using Go Modules:

go get github.com/timeplus-io/proton-go-driver/v2

Quick Start

  1. Run proton with docker, docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:develop
  2. Run following Golang code
package main

import (
	"fmt"
	"github.com/timeplus-io/proton-go-driver/v2"
)

func main() {
	conn := proton.OpenDB(&proton.Options{
		Addr: []string{"127.0.0.1:8463"},
		Auth: proton.Auth{
			Username: "default",
			Password: "",
		},
	})
	var value int
	conn.QueryRow("SELECT 300").Scan(&value)
	fmt.Println(value)
}

above code should return 1 , which shows that everything is working fine now.

Connecting to Proton Database

To connect to the Proton database, create a connection using the following code:

conn := proton.OpenDB(&proton.Options{
    Addr: []string{"127.0.0.1:8463"},
    Auth: proton.Auth{
        Database: "default",
        Username: "default",
        Password: "",
    },
    DialTimeout: 5 * time.Second,
    Compression: &proton.Compression{
        proton.CompressionLZ4,
    },
})
conn.SetMaxIdleConns(5)
conn.SetMaxOpenConns(10)
conn.SetConnMaxLifetime(time.Hour)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
    if rand.Float32() < 0.3 {
        log.Println("progress:", p)
    }
}))

Create Stream

Before working with streaming data, you need to initialize it. Here's an example for creating a stream:

if _, err := conn.ExecContext(ctx, "DROP STREAM IF EXISTS car"); err != nil {
    return err
}
if _, err := conn.ExecContext(ctx, "CREATE STREAM IF NOT EXISTS car(id int64, speed float64)"); err != nil {
    return err
}

Batch Insertion

scope, err := conn.Begin()
if err != nil {
    log.Fatal(err)
}
batch, err := scope.PrepareContext(ctx, "INSERT INTO car (id, speed, _tp_time) values")
for i := 0; i < 20; i++ {
    speed := rand.Float64()*20 + 50
    _, err := batch.Exec(id, speed, time.Now())
    if err != nil {
        log.Fatal(err)
    }
    time.Sleep(time.Duration(100) * time.Millisecond)
}
err = scope.Commit()
if err != nil {
    log.Fatal(err)
}

Streaming Query

const QueryDDL = `SELECT id, avg(speed), window_start, window_end
    FROM session(car, 1h, [speed >= 60, speed < 60))
    GROUP BY id, window_start, window_end`
conn, ctx := getConnection(context.Background())
ctx, cancel := context.WithCancel(ctx)
rows, err := conn.QueryContext(ctx, QueryDDL)
if err != nil {
    log.Fatal(err)
}
defer rows.Close()
go func() {
    time.Sleep(time.Duration(20) * time.Second)
    cancel()
}()
for rows.Next() {
    var car SpeedingCarRcd
    if err := rows.Scan(&car.Id, &car.Speed, &car.Start, &car.End); err != nil {
        log.Fatal(err)
    }
    log.Printf("%+v", car)
}
err = rows.Err()
if err != nil {
    log.Fatal(err)
}

[!NOTE] To cancel a streaming query, you need to use the cancel function returned by context.WithCancel.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchAlreadySent               = errors.New("proton: batch has already been sent")
	ErrAcquireConnTimeout             = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
	ErrUnsupportedServerRevision      = errors.New("proton: unsupported server revision")
	ErrBindMixedNamedAndNumericParams = errors.New("proton [bind]: mixed named and numeric parameters")
)
View Source
var CompressionLZ4 compress.Method = compress.LZ4

Functions

func Context

func Context(parent context.Context, options ...QueryOption) context.Context

func ExtractJSONPath added in v2.1.0

func ExtractJSONPath(o *JSON, path string) (value interface{}, ok bool)

ExtractJSONPath is a convenience function for asserting a path to a specific type. The underlying value is also extracted from its Dynamic wrapper if present.

func Named

func Named(name string, value interface{}) driver.NamedValue

func Open

func Open(opt *Options) (driver.Conn, error)

func OpenDB

func OpenDB(opt *Options) *sql.DB

Types

type Auth

type Auth struct {
	Database string
	Username string
	Password string
}

type Compression

type Compression struct {
	Method compress.Method
}

type Conn

type Conn = driver.Conn

type ConnOpenStrategy

type ConnOpenStrategy uint8
const (
	ConnOpenInOrder ConnOpenStrategy = iota
	ConnOpenRoundRobin
)

type Dynamic added in v2.1.0

type Dynamic = chcol.Dynamic

Dynamic is an alias for the Variant type

func NewDynamic added in v2.1.0

func NewDynamic(v interface{}) Dynamic

NewDynamic creates a new Dynamic with the given value

func NewDynamicWithType added in v2.1.0

func NewDynamicWithType(v interface{}, chType string) Dynamic

NewDynamicWithType creates a new Dynamic with the given value and ClickHouse type

type Exception

type Exception = proto.Exception

type JSON added in v2.1.0

type JSON = chcol.JSON

JSON represents a ClickHouse JSON type that can hold multiple possible types

func NewJSON added in v2.1.0

func NewJSON() *JSON

NewJSON creates a new empty JSON value

type JSONDeserializer added in v2.1.0

type JSONDeserializer = chcol.JSONDeserializer

JSONDeserializer interface allows a struct to load its data from an optimized JSON structure instead of relying on recursive reflection to set its fields.

type JSONSerializer added in v2.1.0

type JSONSerializer = chcol.JSONSerializer

JSONSerializer interface allows a struct to be manually converted to an optimized JSON structure instead of relying on recursive reflection. Note that the struct must be a pointer in order for the interface to be matched, reflection will be used otherwise.

type Log

type Log struct {
	Time      time.Time
	TimeMicro uint32
	Hostname  string
	QueryID   string
	ThreadID  uint64
	Priority  int8
	Source    string
	Text      string
}

type OpError

type OpError struct {
	Op         string
	ColumnName string
	Err        error
}

func (*OpError) Error

func (e *OpError) Error() string

type Options

type Options struct {
	TLS              *tls.Config
	Addr             []string
	Auth             Auth
	DialContext      func(ctx context.Context, addr string) (net.Conn, error)
	Debug            bool
	Settings         Settings
	Compression      *Compression
	DialTimeout      time.Duration // default 1 second
	MaxOpenConns     int           // default MaxIdleConns + 5
	MaxIdleConns     int           // default 5
	ConnMaxLifetime  time.Duration // default 1 hour
	ConnOpenStrategy ConnOpenStrategy
}

func ParseDSN

func ParseDSN(dsn string) (*Options, error)

type ProfileEvent

type ProfileEvent struct {
	Hostname    string
	CurrentTime time.Time
	ThreadID    uint64
	Type        string
	Name        string
	Value       int64
}

type ProfileInfo

type ProfileInfo = proto.ProfileInfo

type Progress

type Progress = proto.Progress

type QueryOption

type QueryOption func(*QueryOptions) error

func WithExternalTable

func WithExternalTable(t ...*external.Table) QueryOption

func WithLogs

func WithLogs(fn func(*Log)) QueryOption

func WithProfileEvents

func WithProfileEvents(fn func([]ProfileEvent)) QueryOption

func WithProfileInfo

func WithProfileInfo(fn func(*ProfileInfo)) QueryOption

func WithProgress

func WithProgress(fn func(*Progress)) QueryOption

func WithQueryID

func WithQueryID(queryID string) QueryOption

func WithQuotaKey

func WithQuotaKey(quotaKey string) QueryOption

func WithSettings

func WithSettings(settings Settings) QueryOption

func WithSpan

func WithSpan(span trace.SpanContext) QueryOption

func WithStdAsync

func WithStdAsync(wait bool) QueryOption

type QueryOptions

type QueryOptions struct {
	// contains filtered or unexported fields
}

type ServerVersion

type ServerVersion = proto.ServerHandshake

type Settings

type Settings map[string]interface{}

type Variant added in v2.1.0

type Variant = chcol.Variant

Variant represents a ClickHouse Variant type that can hold multiple possible types

func NewVariant added in v2.1.0

func NewVariant(v interface{}) Variant

NewVariant creates a new Variant with the given value

func NewVariantWithType added in v2.1.0

func NewVariantWithType(v interface{}, chType string) Variant

NewVariantWithType creates a new Variant with the given value and ClickHouse type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL