Documentation
¶
Index ¶
- Constants
- Variables
- func QueryBuilder(stmnt string, params ...interface{}) (*string, error)
- type ActiveStandbyPerQuery
- type ActiveStandbyPerQueryMap
- type BodyReader
- type ClusterNode
- type ClusterNodeMap
- type ClusterStatus
- type ClusterStatusResponse
- type Column
- type CommandStatus
- type ExecOptions
- type Field
- type Header
- type HostStoreLags
- type KsqlResponse
- type KsqlResponseSlice
- type KsqlServerInfo
- type KsqlServerInfoResponse
- type Ksqldb
- type KsqldbClient
- func (cl *KsqldbClient) Close()
- func (cl *KsqldbClient) EnableParseSQL(activate bool)
- func (api *KsqldbClient) Execute(options ExecOptions) (*KsqlResponseSlice, error)
- func (api *KsqldbClient) GetClusterStatus() (*ClusterStatusResponse, error)
- func (api *KsqldbClient) GetQueryStatus(commandId string) (*QueryStatus, error)
- func (api *KsqldbClient) GetServerInfo() (*KsqlServerInfo, error)
- func (api *KsqldbClient) GetServerStatus() (*ServerStatusResponse, error)
- func (cl *KsqldbClient) ParseSQLEnabled() bool
- func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
- func (api *KsqldbClient) Push(ctx context.Context, sql string, rowChannel chan<- Row, ...) (err error)
- func (api *KsqldbClient) TerminateCluster(topics ...string) (*KsqlResponseSlice, error)
- func (api *KsqldbClient) ValidateProperty(property string) (*bool, error)
- type LagByPartition
- type LagByPartitionMap
- type NewClientFactory
- type NewClientWithOptionsFactory
- type Partition
- type PartitionMap
- type Payload
- type PropertyMap
- type Query
- type QueryDescription
- type QueryOptions
- type QuerySlice
- type QueryStatus
- type RequestParams
- type RespUnmarshaller
- type Response
- type ResponseError
- type Row
- type Schema
- type ServerStatusResponse
- type SessionVariablesMap
- type StateStoreLag
- type StateStoreLagMap
- type Stream
- type StreamSlice
- type Table
- type TableSlice
- type TerminateClusterTopics
- type TopicPartition
Constants ¶
const ( QUERY_STREAM_ENDPOINT = "/query-stream" QUERY_ENDPOINT = "/query" INSERTS_ENDPOINT = "/inserts-stream" CLOSE_QUERY_ENDPOINT = "/close-query" KSQL_ENDPOINT = "/ksql" INFO_ENDPOINT = "/info" STATUS_ENDPOINT = "/status" HEALTHCHECK_ENDPOINT = "/healthcheck" CLUSTER_STATUS_ENDPOINT = "/clusterStatus" PROP_VALIDITY_ENPOINT = "/is_valid_property" TERMINATE_CLUSTER_ENDPOINT = "/ksql/terminate" )
const ( QBErr = "qbErr" QBUnsupportedType = "unsupported param type" EMPTY_STATEMENT = "empty ksql statement" )
const (
HEARTBEAT_TRESHOLD = 9 // After 9 minutes the connection will be closed
)
const (
KSQL_QUERY_PULL_TABLE_SCAN_ENABLED = "ksql.query.pull.table.scan.enabled"
)
Variables ¶
var (
ErrNotFound = errors.New("no result found")
)
Functions ¶
func QueryBuilder ¶ added in v0.0.3
QueryBuilder replaces ? with the correct types in the sql statement
Types ¶
type ActiveStandbyPerQuery ¶ added in v0.0.4
type ActiveStandbyPerQuery struct {
ActiveStores []string
ActivePartitions []TopicPartition
StandByStore []string
StandByPartitions []string
}
type ActiveStandbyPerQueryMap ¶ added in v0.0.4
type ActiveStandbyPerQueryMap map[string]ActiveStandbyPerQuery
type ClusterNode ¶ added in v0.0.4
type ClusterNode struct {
HostAlive bool
LastStatusUpdateMs int64
HostStoreLags HostStoreLags
ActiveStandbyPerQuery ActiveStandbyPerQueryMap
}
type ClusterNodeMap ¶ added in v0.0.4
type ClusterNodeMap map[string]ClusterNode
type ClusterStatus ¶ added in v0.0.4
type ClusterStatus struct {
Host ClusterNodeMap `mapstructure:",remain"`
}
type ClusterStatusResponse ¶ added in v0.0.4
type ClusterStatusResponse struct {
ClusterStatus ClusterStatus
}
type CommandStatus ¶ added in v0.0.4
type ExecOptions ¶ added in v0.0.4
type ExecOptions struct {
KSql string `json:"ksql"`
StreamsProperties PropertyMap `json:"streamsProperties,omitempty"`
SessionVariables SessionVariablesMap `json:"sessionVariables,omitempty"`
CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"`
}
func (*ExecOptions) EmptyQuery ¶ added in v0.0.4
func (o *ExecOptions) EmptyQuery() bool
func (*ExecOptions) SanitizeQuery ¶ added in v0.0.4
func (o *ExecOptions) SanitizeQuery()
type Header ¶
type Header struct {
// contains filtered or unexported fields
}
Header represents a header returned from a query
type HostStoreLags ¶ added in v0.0.4
type HostStoreLags struct {
StateStoreLags StateStoreLagMap
UpdateTimeMs uint64
}
type KsqlResponse ¶ added in v0.0.4
type KsqlResponse struct {
StatementText string
Warnings []string
Type string `json:"@type"`
CommandId string `json:"commandId,omitempty"`
CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` // -1 if the operation was unsuccessful
CommandStatus CommandStatus `json:"commandStatus,omitempty"`
Stream *StreamSlice `json:"streams,omitempty"`
Tables *TableSlice `json:"tables,omitempty"`
Queries *QuerySlice `json:"queries,omitempty"`
QueryDescription *QueryDescription `json:"queryDescription,omitempty"`
}
type KsqlResponseSlice ¶ added in v0.0.4
type KsqlResponseSlice []KsqlResponse
type KsqlServerInfo ¶ added in v0.0.4
type KsqlServerInfo struct {
Version string `json:"version"`
KafkaClusterID string `json:"kafkaClusterId"`
KsqlServiceID string `json:"ksqlServiceId"`
ServerStatus string `json:"serverStatus,omitempty"`
}
KsqlServerInfo
type KsqlServerInfoResponse ¶ added in v0.0.4
type KsqlServerInfoResponse struct {
KsqlServerInfo KsqlServerInfo `json:"KsqlServerInfo"`
}
KsqlServerInfoResponse
type Ksqldb ¶ added in v0.0.4
type Ksqldb interface {
// GetServerInfo returns informations about the ksqlDB Server
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/
GetServerInfo() (*KsqlServerInfo, error)
// GetServerStatus returns server status
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/
GetServerStatus() (*ServerStatusResponse, error)
// GetClusterStatus
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/
GetClusterStatus() (*ClusterStatusResponse, error)
// TerminateCluster terminates a ksqldb cluster - READ THE DOCS before you call this endpoint
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/terminate-endpoint/
TerminateCluster(topics ...string) (*KsqlResponseSlice, error)
// ValidateProperty validates a property
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/is_valid_property-endpoint/
ValidateProperty(property string) (*bool, error)
// Pull data
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/
Pull(context.Context, string, bool) (Header, Payload, error)
// Push data
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/
Push(context.Context, string, chan<- Row, chan<- Header) error
// GetQueryStatus
// @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/status-endpoint/
GetQueryStatus(string) (*QueryStatus, error)
// EnableParseSQL enables/disables query parsing for push/pull/execute requests
EnableParseSQL(bool)
// ParseSQLEnabled returns true if query parsing is enabled or not
ParseSQLEnabled() bool
// Close closes net.HTTPClient transport
Close()
}
type KsqldbClient ¶ added in v0.0.4
type KsqldbClient struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(http net.HTTPClient) (KsqldbClient, error)
NewClient returns a new KsqldbClient with the given net.HTTPclient
func NewClientWithOptions ¶ added in v0.0.4
func NewClientWithOptions(options net.Options) (KsqldbClient, error)
NewClientWithOptions returns a new @KsqldbClient with Options
func (*KsqldbClient) Close ¶ added in v0.0.4
func (cl *KsqldbClient) Close()
Close closes the underlying http transport
func (*KsqldbClient) EnableParseSQL ¶ added in v0.0.4
func (cl *KsqldbClient) EnableParseSQL(activate bool)
EnableParseSQL enables / disables sql parsing
func (*KsqldbClient) Execute ¶ added in v0.0.4
func (api *KsqldbClient) Execute(options ExecOptions) (*KsqlResponseSlice, error)
Execute will execute a ksqlDB statement. All statements, except those starting with SELECT, can be run on this endpoint. To run SELECT statements use use Push or Pull functions.
To use this function pass in the @ExecOptions.
Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/
func (*KsqldbClient) GetClusterStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetClusterStatus() (*ClusterStatusResponse, error)
GetClusterStatus @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/
func (*KsqldbClient) GetQueryStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetQueryStatus(commandId string) (*QueryStatus, error)
GetQueryStatus returns the current command status for a CREATE, DROP, or TERMINATE statement.
CREATE, DROP, and TERMINATE statements returns an object that indicates the current state of statement execution. A statement can be in one of the following states:
QUEUED, PARSING, EXECUTING: The statement was accepted by the server and is being processed. SUCCESS: The statement was successfully processed. ERROR: There was an error processing the statement. The statement was not executed.
TERMINATED: The query started by the statement was terminated. Only returned for CREATE STREAM|TABLE AS SELECT.
If a CREATE, DROP, or TERMINATE statement returns a command status with state QUEUED, PARSING, or EXECUTING from the @Execute endpoint, you can use the @GetQueryStatus endpoint to poll the status of the command.
func (*KsqldbClient) GetServerInfo ¶ added in v0.0.4
func (api *KsqldbClient) GetServerInfo() (*KsqlServerInfo, error)
ServerInfo gets the info for your server api net.KsqlHTTPClient
func (*KsqldbClient) GetServerStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetServerStatus() (*ServerStatusResponse, error)
ServerInfo provides information about your server
func (*KsqldbClient) ParseSQLEnabled ¶ added in v0.0.4
func (cl *KsqldbClient) ParseSQLEnabled() bool
ParseSQLEnabled returns true if sql parsing is enabled; false otherwise
func (*KsqldbClient) Pull ¶ added in v0.0.4
func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.
To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.
The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var col1 string
var col2 float64
for _, row := range r {
col1 = row[0].(string)
col2 = row[1].(float64)
// Do other stuff with the data here
}
}
func (*KsqldbClient) Push ¶ added in v0.0.4
func (api *KsqldbClient) Push(ctx context.Context, sql string, rowChannel chan<- Row, headerChannel chan<- Header) (err error)
Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.
Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.
To use this function pass in a context, the SQL query statement, and two channels:
* ksqldb.Row - rows of data * ksqldb.Header - header (including column definitions).
If you don't want to block before receiving row data then make this channel buffered.
The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var DATA_TS float64
var ID string
for row := range rc {
if row != nil {
DATA_TS = row[0].(float64)
ID = row[1].(string)
func (*KsqldbClient) TerminateCluster ¶ added in v0.0.4
func (api *KsqldbClient) TerminateCluster(topics ...string) (*KsqlResponseSlice, error)
func (*KsqldbClient) ValidateProperty ¶ added in v0.0.4
func (api *KsqldbClient) ValidateProperty(property string) (*bool, error)
ValidateProperty resource tells you whether a property is prohibited from setting. If prohibited the ksqlDB server api returns a 400 error
type LagByPartition ¶ added in v0.0.4
type LagByPartition struct {
Partition Partition
}
type LagByPartitionMap ¶ added in v0.0.4
type LagByPartitionMap map[string]LagByPartition
type NewClientFactory ¶ added in v0.0.4
type NewClientFactory interface {
// NewClient factory
NewClient(net.HTTPClient) (*KsqldbClient, error)
}
type NewClientWithOptionsFactory ¶ added in v0.0.4
type PartitionMap ¶ added in v0.0.4
type PropertyMap ¶ added in v0.0.4
type QueryDescription ¶ added in v0.0.4
type QueryOptions ¶ added in v0.0.4
type QueryOptions struct {
Sql string `json:"sql"`
Properties PropertyMap `json:"properties"`
}
func (*QueryOptions) EmptyQuery ¶ added in v0.0.4
func (o *QueryOptions) EmptyQuery() bool
func (*QueryOptions) EnablePullQueryTableScan ¶ added in v0.0.4
func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions
EnablePullQueryTableScan to control whether table scans are permitted when executing pull queries.
Without this enabled, only key lookups are used.
Enabling table scans removes various restrictions on what types of queries are allowed.
In particular, these pull query types are now permitted:
- No WHERE clause
- Range queries on keys
- Equality and range queries on non-key columns
- Multi-column key queries without specifying all key columns
There may be significant performance implications to using these types of queries, depending on the size of the data and other workloads running, so use this config carefully.
func (*QueryOptions) SanitizeQuery ¶ added in v0.0.4
func (q *QueryOptions) SanitizeQuery()
type QuerySlice ¶ added in v0.0.4
type QuerySlice []Query
type QueryStatus ¶ added in v0.0.4
type RequestParams ¶ added in v0.0.3
type RequestParams map[string]interface{}
type RespUnmarshaller ¶ added in v0.0.4
type ResponseError ¶ added in v0.0.4
type ResponseError struct {
ErrType string `json:"@type"`
ErrCode int `json:"error_code"`
Message string `json:"message"`
}
func (ResponseError) Error ¶ added in v0.0.4
func (e ResponseError) Error() string
type ServerStatusResponse ¶ added in v0.0.4
type ServerStatusResponse struct {
IsHealthy *bool `json:"isHealthy"`
Details struct {
Metastore struct {
IsHealthy *bool `json:"isHealthy"`
} `json:"metastore"`
Kafka struct {
IsHealthy *bool `json:"isHealthy"`
} `json:"kafka"`
} `json:"details"`
KsqlServiceID string `json:"ksqlServiceId"`
}
ServerStatusResponse
type SessionVariablesMap ¶ added in v0.0.4
type SessionVariablesMap map[string]interface{}
type StateStoreLag ¶ added in v0.0.4
type StateStoreLag struct {
LagByPartition LagByPartitionMap
Size uint64
}
type StateStoreLagMap ¶ added in v0.0.4
type StateStoreLagMap map[string]StateStoreLag
type StreamSlice ¶ added in v0.0.4
type StreamSlice []Stream
type TableSlice ¶ added in v0.0.4
type TableSlice []Table
type TerminateClusterTopics ¶ added in v0.0.4
type TerminateClusterTopics struct {
DeleteTopicList []string `json:"deleteTopicList,omitempty"`
}
func (*TerminateClusterTopics) Add ¶ added in v0.0.4
func (tct *TerminateClusterTopics) Add(topics ...string)
func (*TerminateClusterTopics) Size ¶ added in v0.0.4
func (tct *TerminateClusterTopics) Size() int
