ksqldb

package module
v0.0.0-...-9571a26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: Apache-2.0 Imports: 15 Imported by: 0

README

ksqlDB Go library

Go Reference Coverage Maintainability Rating Technical Debt Security Rating Vulnerabilities

This is a unconnected fork from Robin Moffatt and will be developed on its own.

Thank you Robin and all other contributors for their work!

Attention - WIP

If you use this library, be warned as the client will be completely overhauled!

This client is not production ready and the interfaces can be changed without notification!!!

⚠️ Disclaimer #1: This is a personal project and not supported or endorsed by Confluent.

Migration?

Checkout ksqldb-migrate, a tool to run your ksqlDB migrations.

Description

This is a Go client for ksqlDB.

  • Execute a statement (/ksql endpoint)
  • Run push and pull queries (/query-stream endpoint)
  • Close push query (/close-query endpoint)
  • Terminate a cluster (/ksql/terminate endpoint)
  • Introspect query status (/status endpoint)
  • Introspect server status (/info endpoint)
  • Introspect cluster status (/clusterStatus endpoint)
  • Get the validity of a property (/is_valid_property)

Deprecation: the Run a query endpoint is deprecated and willl not be implemented.

KSqlParser
  • the lexer works like the Confluent Java lexer case insensitive (ex SELECT * FROM BLA is identical to select * from bla). (since v0.0.4)
  • parse your ksql-statements with the provided parser.ParseSql method.
  • Push, Pull, Execute queries parsed by default with parser.ParseSQL.
  • <client-instance>.EnableParseSQL(false) enables / disables the parser
Supported ksqlDB versions
  • tested with ksqldb: v0.22.0, v0.21.0

Installation

Module install:

This client is a Go module, therefore you can have it simply by adding the following import to your code:

import "github.com/thmeitz/ksqldb-go"

Then run a build to have this client automatically added to your go.mod file as a dependency.

Manual install:

go get -u github.com/thmeitz/ksqldb-go

or use the client and and run

go mod tidy

How to use the ksqldb-go package?

Create a ksqlDB Client
Breaking Change v0.0.4

The HTTP client has now its own package

import (
  "github.com/Masterminds/log-go"
  "github.com/Masterminds/log-go/impl/logrus"
  "github.com/thmeitz/ksqldb-go"
  "github.com/thmeitz/ksqldb-go/net"
)

var (
	logger = logrus.NewStandard()
)
// than later in your code...
func main {
  options := net.Options{
    // if you need a login, do this; if not its not necessary
    Credentials: net.Credentials{Username: "myuser", Password: "mypassword"},
    // defaults to http://localhost:8088
    BaseUrl:     "http://my-super-shiny-ksqldbserver:8082",
    // this is needed, because the ksql api communicates with http2 only
    // default value in v0.0.4
    AllowHTTP:   true,
  }

  // only log.Logger is allowed or nil (since v0.0.4)
  // logrus is in maintenance mode, so I'll using zap in the future
  client, err := net.NewHTTPClient(options, nil)
  if err != nil {
     log.Fatal(err)
  }
  defer client.Close()

  // then make a pull, push, execute request
}

For no authentication remove Credentials from options.

QueryBuilder
Breaking Change v0.0.4

The QueryBuilder was to complicated, so I've refactored it

SQL strings should be build by a QueryBuilder. Otherwise the system is open for SQL injections (see go-webapp-scp.pdf ).

You can add multiple parameters QueryBuilder("insert into bla values(?,?,?,?,?)", nil, 1, 2.5686, "string", true).

nil will be converted to NULL.

The number of parameters must match the parameters in the SQL statement. If not, an error is thrown.

//see file: examples/cobra-test/cmd/pull.go

k := `SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
DOG_SIZE, DOGS_CT FROM DOGS_BY_SIZE
WHERE DOG_SIZE=?;`

stmnt, err := ksqldb.QueryBuilder(k, "middle")
if err != nil {
	log.Fatal(err)
}

fmt.Println(*stmnt)
Pull query

options := net.Options{
	Credentials: net.Credentials{Username: "user", Password: "password"},
	BaseUrl:     "http://localhost:8088",
	AllowHTTP:   true,
}

kcl, err := ksqldb.NewClientWithOptions(options)
if err != nil {
	log.Fatal(err)
}
defer kcl.Close()

query := `select timestamptostring(windowstart,'yyyy-MM-dd HH:mm:ss','Europe/London') as window_start,
timestamptostring(windowend,'HH:mm:ss','Europe/London') as window_end, dog_size, dogs_ct
from dogs_by_size where dog_size=?;`

stmnt, err := ksqldb.QueryBuilder(query, dogsize)
if err != nil {
	log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

qOpts := (&ksqldb.QueryOptions{Sql: *stmnt}).EnablePullQueryTableScan(false)

_, r, err := kcl.Pull(ctx, *qOpts)
if err != nil {
	log.Fatal(err)
}

var windowStart string
var windowEnd string
var dogSize string
var dogsCt float64
for _, row := range r {

	if row != nil {
		// Should do some type assertions here
		windowStart = row[0].(string)
		windowEnd = row[1].(string)
		dogSize = row[2].(string)
		dogsCt = row[3].(float64)
		log.Infof("🐶 There are %v dogs size %v between %v and %v", dogsCt, dogSize, windowStart, windowEnd)
	}
}
Push query
options := net.Options{
	Credentials: net.Credentials{Username: "user", Password: "password"},
	BaseUrl:     "http://localhost:8088",
	AllowHTTP:   true,
}

kcl, err := ksqldb.NewClientWithOptions(options)
if err != nil {
	log.Fatal(err)
}
defer kcl.Close()

// you can disable parsing with `kcl.EnableParseSQL(false)`
query := "select rowtime, id, name, dogsize, age from dogs emit changes;"

rowChannel := make(chan ksqldb.Row)
headerChannel := make(chan ksqldb.Header, 1)

// This Go routine will handle rows as and when they
// are sent to the channel
go func() {
	var dataTs float64
	var id string
	var name string
	var dogSize string
	var age string
	for row := range rowChannel {
		if row != nil {
			// Should do some type assertions here
			dataTs = row[0].(float64)
			id = row[1].(string)
			name = row[2].(string)
			dogSize = row[3].(string)
			age = row[4].(string)

			// Handle the timestamp
			t := int64(dataTs)
			ts := time.Unix(t/1000, 0).Format(time.RFC822)

			log.Infof("🐾 New dog at %v: '%v' is %v and %v (id %v)\n", ts, name, dogSize, age, id)
		}
	}
}()

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

e := kcl.Push(ctx, ksqldb.QueryOptions{Sql: query}, rowChannel, headerChannel)
if e != nil {
	log.Fatal(e)
}
Execute a command
  options := net.Options{
		Credentials: net.Credentials{Username: "user", Password: "password"},
		BaseUrl:     "http://localhost:8088",
		AllowHTTP:   true,
	}

	kcl, err := ksqldb.NewClientWithOptions(options)
	if err != nil {
		log.Fatal(err)
	}
	defer kcl.Close()

	resp, err := kcl.Execute(ksqldb.ExecOptions{KSql: `
		CREATE SOURCE CONNECTOR DOGS WITH (
		'connector.class'                = 'io.mdrogalis.voluble.VolubleSourceConnector',
		'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',
		'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
		'value.converter.schemas.enable' = 'false',
		'genkp.dogs.with'                = '#{Internet.uuid}',
		'genv.dogs.name.with'            = '#{Dog.name}',
		'genv.dogs.dogsize.with'         = '#{Dog.size}',
		'genv.dogs.age.with'             = '#{Dog.age}',
		'topic.dogs.throttle.ms'         = 1000
		);
		`})
	if err != nil {
		log.Fatalf("create source connector dogs failed %w", err)
		os.Exit(-1)
	}

Examples

You can find the examples in the examples directory.

See the test environment here

Cobra example

The Cobra cobra-test example shows basic usage of the ksqldb-go package. To run it, you need a Kafka runtime environment.

The example splits the different use cases into Cobra commands.

Start docker-compose.

docker-compose up -d
go run ./examples/cobra-test

It outputs:

ksqldb-go example with cobra

Usage:
  cobra-test [command]

Available Commands:
  check             check a <example>.ksql file with the integrated parser
  cluster-status    get cluster status
  completion        generate the autocompletion script for the specified shell
  health            display the server state of your servers
  help              Help about any command
  info              Displays your server infos
  pull              print the dog stats
  push              push dogs example
  setup             setup a dummy connector
  terminate-cluster terminates your cluster
  validate          validates a property

Flags:
      --config string      config file (default is $HOME/.cobra-test.yaml)
  -h, --help               help for cobra-test
      --host string        set the ksqldb host (default "http://localhost:8088")
      --logformat string   set log format [text|json] (default "text")
      --loglevel string    set log level [info|debug|error|trace] (default "debug")
      --password string    set the ksqldb user password
      --username string    set the ksqldb user name

Use "cobra-test [command] --help" for more information about a command.

The cobra-test setup command sets up all needed stuff for cobra-test pull|push commands.

So run it first.

KSql Grammar example

This example was written to test and fix the Antlr4 generation problems for Golang. We changed the Antlr4 file because there are some type issues (type is a reserved word in golang). The Antlr4 code generation introduced some bugs that we had to fix manually (no Antlr4 output for needed package names). So be careful when you use our Makefile to generate the KSqlParser. It will break the code!

We had copied the Antlr4 file from the original sources of confluent.

The parser is used to check the KSql syntax. If there are syntax errors, the errors will be collected and you get a notification about it.

Docker compose

It contains the latest versions of all products.

  • zookeeper (6.2.1)
  • schema-registry (6.2.1)
  • ksqldb server (0.21.0)
  • kafka-connect (6.2.1)
  • ksqldb-cli (0.21.0)
  • kafdrop (latest)
ksqldb

I've added following options to docker-compose to get the ClusterStatus.

KSQL_OPTS: "-Dksql.heartbeat.enable=true -Dksql.lag.reporting.enable=true"
ksqldb-cli

For testing purposes I've added ksqldb-cli to the docker-compose.yml file.

docker exec -it ksqldb-cli ksql http://ksqldb:8088

This starts the interctive ksqldb console.

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v0.21.0, Server v0.21.0 located at http://ksqldb:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Kafdrop

Kafdrop is a web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.

Kafdrop runs on port 9000 on your localhost.

http://localhost:9000

TODO

See https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/contributing/

License

Apache License Version 2.0

Documentation

Index

Constants

View Source
const (
	QUERY_STREAM_ENDPOINT      = "/query-stream"
	QUERY_ENDPOINT             = "/query"
	INSERTS_ENDPOINT           = "/inserts-stream"
	CLOSE_QUERY_ENDPOINT       = "/close-query"
	KSQL_ENDPOINT              = "/ksql"
	INFO_ENDPOINT              = "/info"
	STATUS_ENDPOINT            = "/status"
	HEALTHCHECK_ENDPOINT       = "/healthcheck"
	CLUSTER_STATUS_ENDPOINT    = "/clusterStatus"
	PROP_VALIDITY_ENPOINT      = "/is_valid_property"
	TERMINATE_CLUSTER_ENDPOINT = "/ksql/terminate"
)
View Source
const (
	KSQL_QUERY_PULL_TABLE_SCAN_ENABLED   = "ksql.query.pull.table.scan.enabled"
	KSQL_STREAMS_AUTO_OFFSET_RESET       = "ksql.streams.auto.offset.reset"
	KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS = "ksql.idle.connection.timeout.seconds"
	DEFAULT_IDLE_CONNECTION_TIMEOUT      = int64(600) // 10 minutes
)
View Source
const (
	QBErr             = "qbErr"
	QBUnsupportedType = "unsupported param type"
	EMPTY_STATEMENT   = "empty ksql statement"
)

Variables

View Source
var (
	ErrNotFound = errors.New("no result found")
)

Functions

func QueryBuilder

func QueryBuilder(stmnt string, params ...interface{}) (*string, error)

QueryBuilder replaces ? with the correct types in the sql statement

Types

type ActiveStandbyPerQuery

type ActiveStandbyPerQuery struct {
	ActiveStores      []string
	ActivePartitions  []TopicPartition
	StandByStore      []string
	StandByPartitions []string
}

type ActiveStandbyPerQueryMap

type ActiveStandbyPerQueryMap map[string]ActiveStandbyPerQuery

type BodyReader

type BodyReader func(io.Reader) ([]byte, error)

type ClusterNode

type ClusterNode struct {
	HostAlive             bool
	LastStatusUpdateMs    int64
	HostStoreLags         HostStoreLags
	ActiveStandbyPerQuery ActiveStandbyPerQueryMap
}

type ClusterNodeMap

type ClusterNodeMap map[string]ClusterNode

type ClusterStatus

type ClusterStatus struct {
	Host ClusterNodeMap `mapstructure:",remain"`
}

type ClusterStatusResponse

type ClusterStatusResponse struct {
	ClusterStatus ClusterStatus
}

type Column

type Column struct {
	Name string
	Type string
}

Column represents the metadata for a column in a Row

type CommandStatus

type CommandStatus struct {
	Message string
	Status  string
}

type ExecOptions

type ExecOptions struct {
	KSql                  string              `json:"ksql"`
	StreamsProperties     PropertyMap         `json:"streamsProperties,omitempty"`
	SessionVariables      SessionVariablesMap `json:"sessionVariables,omitempty"`
	CommandSequenceNumber int64               `json:"commandSequenceNumber,omitempty"`
}

func (*ExecOptions) EmptyQuery

func (o *ExecOptions) EmptyQuery() bool

func (*ExecOptions) SanitizeQuery

func (o *ExecOptions) SanitizeQuery()

type Field

type Field struct {
	Name   string
	Schema Schema
}
type Header struct {
	QueryId string   `json:"queryId"`
	Columns []Column `json:"Columns"`
}

Header represents a header returned from a query

type HostStoreLags

type HostStoreLags struct {
	StateStoreLags StateStoreLagMap
	UpdateTimeMs   uint64
}

type KsqlResponse

type KsqlResponse struct {
	StatementText         string
	Warnings              []string
	Type                  string            `json:"@type"`
	CommandId             string            `json:"commandId,omitempty"`
	CommandSequenceNumber int64             `json:"commandSequenceNumber,omitempty"` // -1 if the operation was unsuccessful
	CommandStatus         CommandStatus     `json:"commandStatus,omitempty"`
	Stream                *StreamSlice      `json:"streams,omitempty"`
	Tables                *TableSlice       `json:"tables,omitempty"`
	Queries               *QuerySlice       `json:"queries,omitempty"`
	QueryDescription      *QueryDescription `json:"queryDescription,omitempty"`
}

type KsqlResponseSlice

type KsqlResponseSlice []KsqlResponse

type KsqlServerInfo

type KsqlServerInfo struct {
	Version        string `json:"version"`
	KafkaClusterID string `json:"kafkaClusterId"`
	KsqlServiceID  string `json:"ksqlServiceId"`
	ServerStatus   string `json:"serverStatus,omitempty"`
}

KsqlServerInfo

type KsqlServerInfoResponse

type KsqlServerInfoResponse struct {
	KsqlServerInfo KsqlServerInfo `json:"KsqlServerInfo"`
}

KsqlServerInfoResponse

type Ksqldb

type Ksqldb interface {
	// GetServerInfo returns informations about the ksqlDB Server
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/
	GetServerInfo() (*KsqlServerInfo, error)

	// GetServerStatus returns server status
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/
	GetServerStatus() (*ServerStatusResponse, error)

	// GetClusterStatus
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/
	GetClusterStatus() (*ClusterStatusResponse, error)

	// TerminateCluster terminates a ksqldb cluster - READ THE DOCS before you call this endpoint
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/terminate-endpoint/
	TerminateCluster(topics ...string) (*KsqlResponseSlice, error)

	// ValidateProperty validates a property
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/is_valid_property-endpoint/
	ValidateProperty(property string) (*bool, error)

	// Pull data
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/
	Pull(context.Context, string, bool) (Header, Payload, error)

	// Push data
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/
	Push(context.Context, string, chan<- Row, chan<- Header) error

	// ClosePushQuery terminates push query explicitly
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#terminating-queries
	ClosePushQuery(context.Context, string) error

	// GetQueryStatus
	// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/status-endpoint/
	GetQueryStatus(string) (*QueryStatus, error)

	// EnableParseSQL enables/disables query parsing for push/pull/execute requests
	EnableParseSQL(bool)

	// ParseSQLEnabled returns true if query parsing is enabled or not
	ParseSQLEnabled() bool

	// Close closes net.HTTPClient transport
	Close()
}

type KsqldbClient

type KsqldbClient struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(http net.HTTPClient) (KsqldbClient, error)

NewClient returns a new KsqldbClient with the given net.HTTPclient

func NewClientWithOptions

func NewClientWithOptions(options net.Options) (KsqldbClient, error)

NewClientWithOptions returns a new @KsqldbClient with Options

func (*KsqldbClient) Close

func (cl *KsqldbClient) Close()

Close closes the underlying http transport

func (*KsqldbClient) ClosePushQuery

func (api *KsqldbClient) ClosePushQuery(ctx context.Context, queryID string) error

Close Query terminates push query explicitly

func (*KsqldbClient) EnableParseSQL

func (cl *KsqldbClient) EnableParseSQL(activate bool)

EnableParseSQL enables / disables sql parsing

func (*KsqldbClient) Execute

func (api *KsqldbClient) Execute(options ExecOptions) (*KsqlResponseSlice, error)

Execute will execute a ksqlDB statement. All statements, except those starting with SELECT, can be run on this endpoint. To run SELECT statements use use Push or Pull functions.

To use this function pass in the @ExecOptions.

Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/

func (*KsqldbClient) GetQueryStatus

func (api *KsqldbClient) GetQueryStatus(commandId string) (*QueryStatus, error)

GetQueryStatus returns the current command status for a CREATE, DROP, or TERMINATE statement.

CREATE, DROP, and TERMINATE statements returns an object that indicates the current state of statement execution. A statement can be in one of the following states:

QUEUED, PARSING, EXECUTING: The statement was accepted by the server and is being processed.

SUCCESS: The statement was successfully processed.

ERROR: There was an error processing the statement. The statement was not executed.

TERMINATED: The query started by the statement was terminated. Only returned for CREATE STREAM|TABLE AS SELECT.

If a CREATE, DROP, or TERMINATE statement returns a command status with state QUEUED, PARSING, or EXECUTING from the @Execute endpoint, you can use the @GetQueryStatus endpoint to poll the status of the command.

func (*KsqldbClient) GetServerInfo

func (api *KsqldbClient) GetServerInfo() (*KsqlServerInfo, error)

ServerInfo gets the info for your server api net.KsqlHTTPClient

func (*KsqldbClient) GetServerStatus

func (api *KsqldbClient) GetServerStatus() (*ServerStatusResponse, error)

ServerInfo provides information about your server

func (*KsqldbClient) ParseSQLEnabled

func (cl *KsqldbClient) ParseSQLEnabled() bool

ParseSQLEnabled returns true if sql parsing is enabled; false otherwise

func (*KsqldbClient) Pull

func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)

Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.

To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.

The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:

var col1 string
var col2 float64
for _, row := range r {
	col1 = row[0].(string)
	col2 = row[1].(float64)
	// Do other stuff with the data here
	}
}

func (*KsqldbClient) Push

func (api *KsqldbClient) Push(ctx context.Context, options QueryOptions,
	rowChannel chan<- Row, headerChannel chan<- Header) (err error)

Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.

Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.

To use this function pass in a context, the SQL query statement, and two channels:

* ksqldb.Row - rows of data * ksqldb.Header - header (including column definitions).

If you don't want to block before receiving row data then make this channel buffered.

The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:

var DATA_TS float64
var ID string
for row := range rc {
	if row != nil {
		DATA_TS = row[0].(float64)
		ID = row[1].(string)

func (*KsqldbClient) TerminateCluster

func (api *KsqldbClient) TerminateCluster(topics ...string) (*KsqlResponseSlice, error)

func (*KsqldbClient) ValidateProperty

func (api *KsqldbClient) ValidateProperty(property string) (*bool, error)

ValidateProperty resource tells you whether a property is prohibited from setting. If prohibited the ksqlDB server api returns a 400 error

type LagByPartition

type LagByPartition struct {
	Partition Partition
}

type LagByPartitionMap

type LagByPartitionMap map[string]LagByPartition

type NewClientFactory

type NewClientFactory interface {
	// NewClient factory
	NewClient(net.HTTPClient) (*KsqldbClient, error)
}

type NewClientWithOptionsFactory

type NewClientWithOptionsFactory interface {
	// NewClientWithOptions factory
	NewClientWithOptions(options net.Options) (*Ksqldb, error)
}

type Partition

type Partition struct {
	CurrentOffsetPosition uint64
	EndOffsetPosition     uint64
	OffsetLag             uint64
}

type PartitionMap

type PartitionMap map[string]Partition

type Payload

type Payload []Row

Payload represents multiple rows

type PropertyMap

type PropertyMap map[string]string

type Query

type Query struct {
	QueryString string
	Sinks       string
	ID          string // The query ID
}

type QueryDescription

type QueryDescription struct {
	StatementText string
	Fields        []Field
	Sources       []string
	Sinks         []string
	ExecutionPlan string
	Topology      string
}

type QueryOptions

type QueryOptions struct {
	Sql        string      `json:"sql"`
	Properties PropertyMap `json:"properties"`
}

func NewDefaultPullQueryOptions

func NewDefaultPullQueryOptions(sql string) (options QueryOptions)

func NewDefaultPushQueryOptions

func NewDefaultPushQueryOptions(sql string) (options QueryOptions)

func (*QueryOptions) AutoOffsetReset

func (q *QueryOptions) AutoOffsetReset(offset StreamOffset) *QueryOptions

AutoOffsetReset sets the offset to latest | earliest

Determines what to do when there is no initial offset in Apache Kafka® or if the current offset doesn't exist on the server. The default value in ksqlDB is `latest`, which means all Kafka topics are read from the latest available offset.

func (*QueryOptions) EmptyQuery

func (o *QueryOptions) EmptyQuery() bool

func (*QueryOptions) EnablePullQueryTableScan

func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions

EnablePullQueryTableScan to control whether table scans are permitted when executing pull queries.

Without this enabled, only key lookups are used.

Enabling table scans removes various restrictions on what types of queries are allowed.

In particular, these pull query types are now permitted:

- No WHERE clause

- Range queries on keys

- Equality and range queries on non-key columns

- Multi-column key queries without specifying all key columns

There may be significant performance implications to using these types of queries, depending on the size of the data and other workloads running, so use this config carefully.

func (*QueryOptions) SanitizeQuery

func (q *QueryOptions) SanitizeQuery()

func (*QueryOptions) SetIdleConnectionTimeout

func (q *QueryOptions) SetIdleConnectionTimeout(seconds int64) *QueryOptions

SetIdleConnectionTimeout sets the timeout for idle connections

A connection is idle if there is no data in either direction on that connection for the duration of the timeout.

This configuration can be helpful if you are issuing push queries that only receive data infrequently from the server, as otherwise those connections will be severed when the timeout (default 10 minutes) is hit.

Decreasing this timeout enables closing connections more aggressively to save server resources.

Increasing this timeout makes the server more tolerant of low-data volume use cases.

type QuerySlice

type QuerySlice []Query

type QueryStatus

type QueryStatus struct {
	Status  string `json:"status"`
	Message string `json:"message"`
}

type RequestParams

type RequestParams map[string]interface{}

type RespUnmarshaller

type RespUnmarshaller func([]byte, interface{}) error

type Response

type Response map[string]interface{}

type ResponseError

type ResponseError struct {
	ErrType string `json:"@type"`
	ErrCode int    `json:"error_code"`
	Message string `json:"message"`
}

func (ResponseError) Error

func (e ResponseError) Error() string

type Row

type Row []interface{}

Row represents a row returned from a query

type Schema

type Schema struct {
	Type string
}

this is not complete yet

type ServerStatusResponse

type ServerStatusResponse struct {
	IsHealthy *bool `json:"isHealthy"`
	Details   struct {
		Metastore struct {
			IsHealthy *bool `json:"isHealthy"`
		} `json:"metastore"`
		Kafka struct {
			IsHealthy *bool `json:"isHealthy"`
		} `json:"kafka"`
	} `json:"details"`
	KsqlServiceID string `json:"ksqlServiceId"`
}

ServerStatusResponse

type SessionVariablesMap

type SessionVariablesMap map[string]interface{}

type StateStoreLag

type StateStoreLag struct {
	LagByPartition LagByPartitionMap
	Size           uint64
}

type StateStoreLagMap

type StateStoreLagMap map[string]StateStoreLag

type Stream

type Stream struct {
	Name   string
	Topic  string
	Format string
	Type   string
}

type StreamOffset

type StreamOffset string
const (
	EARLIEST StreamOffset = "earliest"
	LATEST   StreamOffset = "latest"
)

type StreamSlice

type StreamSlice []Stream

type Table

type Table struct {
	Name       string
	Topic      string
	Format     string
	Type       string
	IsWindowed bool
}

type TableSlice

type TableSlice []Table

type TerminateClusterTopics

type TerminateClusterTopics struct {
	DeleteTopicList []string `json:"deleteTopicList,omitempty"`
}

func (*TerminateClusterTopics) Add

func (tct *TerminateClusterTopics) Add(topics ...string)

func (*TerminateClusterTopics) Size

func (tct *TerminateClusterTopics) Size() int

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition uint64
}

Directories

Path Synopsis
examples
net

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL