lenses

package module
v2.1.10+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

README

Lenses Client (Go)

The Landoop's Lenses REST API client written in Go.

build status report card chat

Installation

The only requirement is the Go Programming Language version 1.10+ and a Lenses Box of version 2.0 at least.

$ go get -u github.com/landoop/lenses-go/cmd/lenses-cli

This command will install both the client library for development usage and the CLI in $PATH (setup your $GOPATH/bin if you didn't already).

CLI

Lenses offers a powerful CLI (command-line tool) built in Go that utilizes the REST and WebSocket APIs of Lenses, to communicate with Apache Kafka and exposes a straight forward way to perform common data engineering and site reliability engineering tasks, such as:

  • Automate your CI/CD (continuous-integration and continuous-delivery)
  • Create topics/acls/quotas/schemas/connectors/processors
  • Change or retrieve configurations to store in github
Documentation

Please navigate to https://lenses.stream/dev/lenses-cli/ to learn how to install and use the lenses-cli.

Client

The lenses-go package is made to be used by Go developers to communicate with Lenses by calling the REST and Websocket APIs.

Getting started
import "github.com/landoop/lenses-go"
Authentication
// Prepare authentication using raw Username and Password.
//
// Use it when Lenses setup with "BASIC" or "LDAP" authentication.
auth := lenses.BasicAuthentication{Username: "user", Password: "pass"}
auth := lenses.KerberosAuthentication{
    ConfFile: "/etc/krb5.conf",
    Method:   lenses.KerberosWithPassword{
        Realm: "my.realm or default if empty",
        Username: "user",
        Password: "pass",
    },
}
auth := lenses.KerberosAuthentication{
    ConfFile: "/etc/krb5.conf",
    Method:   lenses.KerberosWithKeytab{KeytabFile: "/home/me/krb5_my_keytab.txt"},
}
auth := lenses.KerberosAuthentication{
    ConfFile: "/etc/krb5.conf",
    Method:   lenses.KerberosFromCCache{CCacheFile: "/tmp/krb5_my_cache_file.conf"},
}

Custom auth can be implement as well: Authenticate(client *lenses.Client) error, see client_authentication.go file for more.

Config
// Prepare the client's configuration based on the host and the authentication above.
currentConfig := lenses.ClientConfig{Host: "domain.com", Authentication: auth, Timeout: "15s", Debug: true}

// Creating the client using the configuration.
client, err := lenses.OpenConnection(currentConfig)
if err != nil {
    // handle error.
}
Read Config from any io.Reader or file
// ReadConfig reads and decodes Config from an io.Reader based on a custom unmarshaler.
// This can be useful to read configuration via network or files (see `ReadConfigFromFile`).
// Sets the `outPtr`. Retruns a non-nil error on any unmarshaler's errors.
ReadConfig(r io.Reader, unmarshaler UnmarshalFunc, outPtr *Config) error

// ReadConfigFromFile reads and decodes Config from a file based on a custom unmarshaler,
// `ReadConfigFromJSON` and `ReadConfigFromYAML` are the internal users,
// but the end-developer can use any custom type of decoder to read a configuration file
// with ease using this function, but keep note that the default behavior of the fields
// depend on the existing unmarshalers, use these tag names to map your decoder's properties.
//
// Accepts the absolute or the relative path of the configuration file.
// Sets the `outPtr`. Retruns a non-nil error if parsing or decoding the file failed or file doesn't exist.
ReadConfigFromFile(filename string, unmarshaler UnmarshalFunc, outPtr *Config) error

// TryReadConfigFromFile will try to read a specific file and unmarshal to `Config`.
// It will try to read it with one of these built'n formats:
// 1. JSON
// 2. YAML
TryReadConfigFromFile(filename string, outPtr *Config) error
// TryReadConfigFromHome will try to read the `Config`
// from the current user's home directory/.lenses, the lookup is based on
// the common configuration filename pattern:
// lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.
TryReadConfigFromHome(outPtr *Config) bool

// TryReadConfigFromExecutable will try to read the `Config`
// from the (client's caller's) executable path that started the current process.
// The lookup is based on the common configuration filename pattern:
// lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.
TryReadConfigFromExecutable(outPtr *Config) bool

// TryReadConfigFromCurrentWorkingDir will try to read the `Config`
// from the current working directory, note that it may differs from the executable path.
// The lookup is based on the common configuration filename pattern:
// lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.
TryReadConfigFromCurrentWorkingDir(outPtr *Config) bool

// ReadConfigFromJSON reads and decodes Config from a json file, i.e `configuration.json`.
//
// Accepts the absolute or the relative path of the configuration file.
// Error may occur when the file doesn't exists or is not formatted correctly.
ReadConfigFromJSON(filename string, outPtr *Config) error

// ReadConfigFromYAML reads and decodes Config from a yaml file, i.e `configuration.yml`.
//
// Accepts the absolute or the relative path of the configuration file.
// Error may occur when the file doesn't exists or is not formatted correctly.
ReadConfigFromYAML(filename string, outPtr *Config) error

Example Code:

# file: ./lenses.yml
CurrentContext: main
Contexts:
  main:
    Host: https://landoop.com
    Kerberos:
      ConfFile: /etc/krb5.conf
      WithPassword:
        Username: the_username
        Password: the_password
        Realm: empty_for_default

Usage:

var config lenses.Config
err := lenses.ReadConfigFromYAML("./lenses.yml", &config)
if err != nil {
    // handle error.
}

client, err := lenses.OpenConnection(*config.GetCurrent())

Config contains tons of capabilities and helpers, you can quickly check them by navigating to the config.go source file.

API Calls

All lenses-go#Client methods return a typed value based on the call and an error as second output to catch any errors coming from backend or client, forget panics.

Go types are first class citizens here, we will not confuse you or let you work based on luck!

topics, err := client.GetTopics()
if err != nil {
    // handle error.
}

// Print the length of the topics we've just received from our Lenses Box.
print(len(topics))

Example on how deeply we make the difference here: Client#GetTopics returns []lenses.Topic, so you can work safely.

topics[0].ConsumersGroup[0].Coordinator.Host
Documentation

Detailed documentation can be found at godocs.

Versioning

Current: v2.1.9

Read more about Semantic Versioning 2.0.0

License

Distributed under Apache Version 2.0 License, click here for more details.

Documentation

Overview

Package lenses is the Landoop's Lenses client API for Gophers.

Source code and other details for the project are available at GitHub:

https://github.com/landoop/lenses-go

Current Version

2.1.9

Installation

The only requirement is the Go Programming Language.

$ go get -u github.com/landoop/lenses-go/cmd/lenses-cli

Example code:

CLI:

https://lenses.stream/dev/lenses-cli/

Index

Constants

View Source
const SchemaLatestVersion = "latest"

SchemaLatestVersion is the only one valid string for the "versionID", it's the "latest" version string and it's used on `GetLatestSchema`.

View Source
const Version = "2.1.9"

Version is the current semantic version of the lenses client and cli.

Variables

ACLOperations is a map which contains the allowed ACL operations(values) per resource type(key).

Based on: https://docs.confluent.io/current/kafka/authorization.html#acl-format

View Source
var DefaultConfigurationHomeDir = filepath.Join(HomeDir(), ".lenses")

DefaultConfigurationHomeDir is the default configuration system directory, by default it's the $HOME/.lenses directory.

View Source
var DefaultContextKey = "master"

DefaultContextKey is used to set an empty client configuration when no custom context available.

View Source
var DefaultQuotaConfigPropertiesToRemove = []string{"producer_byte_rate", "consumer_byte_rate", "request_percentage"}

DefaultQuotaConfigPropertiesToRemove is a set of hard-coded strings that the client will send on `DeleteQuotaXXX` functions. It contains the "producer_byte_rate", "consumer_byte_rate" and "request_percentage" as they're described at the `QuotaConfig` structure.

View Source
var ErrCredentialsMissing = fmt.Errorf("credentials missing or invalid")

ErrCredentialsMissing fires on login, when credentials are missing or are invalid or the specific user has no access to a specific action.

View Source
var ErrUnknownResponse = fmt.Errorf("unknown")

ErrUnknownResponse is fired when unknown error caused an empty response, usually html content with 404 status code, more information can be displayed if `ClientConfig#Debug` is enabled.

ValidCompatibilityLevels holds a list of the valid compatibility levels, see `CompatibilityLevel` type.

Functions

func ClientConfigMarshalJSON

func ClientConfigMarshalJSON(c ClientConfig) ([]byte, error)

ClientConfigMarshalJSON retruns the json string as bytes of the given `ClientConfig` structure.

func ClientConfigMarshalYAML

func ClientConfigMarshalYAML(c ClientConfig) ([]byte, error)

ClientConfigMarshalYAML retruns the yaml string as bytes of the given `ClientConfig` structure.

func ClientConfigUnmarshalJSON

func ClientConfigUnmarshalJSON(b []byte, c *ClientConfig) error

ClientConfigUnmarshalJSON parses the JSON-encoded `ClientConfig` and stores the result in the `ClientConfig` pointed to by "c".

func ConfigMarshalJSON

func ConfigMarshalJSON(c Config) ([]byte, error)

ConfigMarshalJSON returns the JSON encoding of "c" `Config`.

func ConfigMarshalYAML

func ConfigMarshalYAML(c Config) ([]byte, error)

ConfigMarshalYAML returns the YAML encoding of "c" `Config`.

func ConfigUnmarshalJSON

func ConfigUnmarshalJSON(b []byte, c *Config) error

ConfigUnmarshalJSON parses the JSON-encoded `Config` and stores the result in the `Config` pointed to by "c".

func ConfigUnmarshalYAML

func ConfigUnmarshalYAML(b []byte, c *Config) error

ConfigUnmarshalYAML parses the YAML-encoded `Config` and stores the result in the `Config` pointed to by "c".

func HomeDir

func HomeDir() (homeDir string)

HomeDir returns the home directory for the current user on this specific host machine.

func IsValidCompatibilityLevel

func IsValidCompatibilityLevel(compatibility string) bool

IsValidCompatibilityLevel checks if a compatibility of string form is a valid compatibility level value. See `ValidCompatibilityLevels` too.

func JSONAvroSchema

func JSONAvroSchema(avroSchema string) (json.RawMessage, error)

JSONAvroSchema converts and returns the json form of the "avroSchema" as []byte.

func ReadConfig

func ReadConfig(r io.Reader, unmarshaler UnmarshalFunc, outPtr *Config) error

ReadConfig reads and decodes Config from an io.Reader based on a custom unmarshaler. This can be useful to read configuration via network or files (see `ReadConfigFromFile`). Sets the `outPtr`. Retruns a non-nil error on any unmarshaler's errors.

func ReadConfigFromFile

func ReadConfigFromFile(filename string, unmarshaler UnmarshalFunc, outPtr *Config) error

ReadConfigFromFile reads and decodes Config from a file based on a custom unmarshaler, `ReadConfigFromJSON` and `ReadConfigFromYAML` are the internal users, but the end-developer can use any custom type of decoder to read a configuration file with ease using this function, but keep note that the default behavior of the fields depend on the existing unmarshalers, use these tag names to map your decoder's properties.

Accepts the absolute or the relative path of the configuration file. Sets the `outPtr`. Retruns a non-nil error if parsing or decoding the file failed or file doesn't exist.

func ReadConfigFromJSON

func ReadConfigFromJSON(filename string, outPtr *Config) error

ReadConfigFromJSON reads and decodes Config from a json file, i.e `configuration.json`.

Accepts the absolute or the relative path of the configuration file. Error may occur when the file doesn't exists or is not formatted correctly.

func ReadConfigFromYAML

func ReadConfigFromYAML(filename string, outPtr *Config) error

ReadConfigFromYAML reads and decodes Config from a yaml file, i.e `configuration.yml`.

Accepts the absolute or the relative path of the configuration file. Error may occur when the file doesn't exists or is not formatted correctly.

func TryReadConfigFromCurrentWorkingDir

func TryReadConfigFromCurrentWorkingDir(outPtr *Config) bool

TryReadConfigFromCurrentWorkingDir will try to read the `Config` from the current working directory, note that it may differs from the executable path. The lookup is based on the common configuration filename pattern: lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.

func TryReadConfigFromExecutable

func TryReadConfigFromExecutable(outPtr *Config) bool

TryReadConfigFromExecutable will try to read the `Config` from the (client's caller's) executable path that started the current process. The lookup is based on the common configuration filename pattern: lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.

func TryReadConfigFromFile

func TryReadConfigFromFile(filename string, outPtr *Config) (err error)

TryReadConfigFromFile will try to read a specific file and unmarshal to `Config`. It will try to read it with one of these built'n lexers/formats: 1. JSON 2. YAML

func TryReadConfigFromHome

func TryReadConfigFromHome(outPtr *Config) bool

TryReadConfigFromHome will try to read the `Config` from the current user's home directory/.lenses, the lookup is based on the common configuration filename pattern: lenses-cli.json, lenses-cli.yml or lenses.json and lenses.yml.

Types

type ACL

type ACL struct {
	ResourceName   string            `json:"resourceName" yaml:"ResourceName" header:"Name"`           // required.
	ResourceType   ACLResourceType   `json:"resourceType" yaml:"ResourceType" header:"Type"`           // required.
	Principal      string            `json:"principal" yaml:"Principal" header:"Principal"`            // required.
	PermissionType ACLPermissionType `json:"permissionType" yaml:"PermissionType" header:"Permission"` // required.
	Host           string            `json:"host" yaml:"Host" header:"Host"`                           // required.
	Operation      ACLOperation      `json:"operation" yaml:"Operation" header:"Operation"`            // required.
}

ACL is the type which defines a single Apache Access Control List.

func (*ACL) Validate

func (acl *ACL) Validate() error

Validate force validates the acl's resource type, permission type and operation. It returns an error if the operation is not valid for the resource type.

type ACLOperation

type ACLOperation string

ACLOperation is a string and it defines the valid operations for ACL.

Based on: https://github.com/apache/kafka/blob/1.0/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java#L38

Read through `ACLOperations` to learn what operation is valid for each of the available resource types.

const (

	// ACLOperationAny is the "ANY" ACL operation.
	ACLOperationAny ACLOperation = "ANY"
	// ACLOperationAll is the "ALL" ACL operation.
	ACLOperationAll ACLOperation = "ALL"
	// ACLOperationRead is the "READ" ACL operation.
	ACLOperationRead ACLOperation = "READ"
	// ACLOperationWrite is the "WRITE" ACL operation.
	ACLOperationWrite ACLOperation = "WRITE"
	// ACLOperationCreate is the "CREATE" ACL operation.
	ACLOperationCreate ACLOperation = "CREATE"
	// ACLOperationDelete is the "DELETE" ACL operation.
	ACLOperationDelete ACLOperation = "DELETE"
	// ACLOperationAlter is the "ALTER" ACL operation.
	ACLOperationAlter ACLOperation = "ALTER"
	// ACLOperationDescribe is the "DESCRIBE" ACL operation.
	ACLOperationDescribe ACLOperation = "DESCRIBE"
	// ACLOperationClusterAction is the "CLUSTER_ACTION" ACL operation.
	ACLOperationClusterAction ACLOperation = "CLUSTER_ACTION"
	// ACLOperationDescribeConfigs is the "DESCRIBE_CONFIGS" ACL operation.
	ACLOperationDescribeConfigs ACLOperation = "DESCRIBE_CONFIGS"
	// ACLOperationAlterConfigs is the "ALTER_CONFIGS" ACL operation.
	ACLOperationAlterConfigs ACLOperation = "ALTER_CONFIGS"
	// ACLOperationIdempotentWrite is the "IDEMPOTENT_WRITE" ACL operation.
	ACLOperationIdempotentWrite ACLOperation = "IDEMPOTENT_WRITE"
)

type ACLPermissionType

type ACLPermissionType string

ACLPermissionType is a string and it defines the valid permission types for ACL.

Based on: https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/security/auth/PermissionType.scala

const (
	// ACLPermissionAllow is the "Allow" ACL permission type.
	ACLPermissionAllow ACLPermissionType = "Allow"
	// ACLPermissionDeny is the "Deny" ACL permission type.
	ACLPermissionDeny ACLPermissionType = "Deny"
)

type ACLResourceType

type ACLResourceType string

ACLResourceType is a string and it defines the valid resource types for ACL.

Based on: https://github.com/apache/kafka/blob/1.0/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java#L38

const (

	// ACLResourceAny is the "ANY" ACL resource type.
	ACLResourceAny ACLResourceType = "ANY"
	// ACLResourceTopic is the "TOPIC" ACL resource type.
	ACLResourceTopic ACLResourceType = "TOPIC"
	// ACLResourceGroup is the "GROUP" ACL resource type.
	ACLResourceGroup ACLResourceType = "GROUP"
	// ACLResourceCluster is the "CLUSTER" ACL resource type.
	ACLResourceCluster ACLResourceType = "CLUSTER"
	// ACLResourceTransactionalID is the "TRANSACTIONAL_ID" ACL resource type.
	ACLResourceTransactionalID ACLResourceType = "TRANSACTIONAL_ID"
	// ACLResourceDelegationToken is the "DELEGATION_TOKEN" ACL resource type,
	// available only on kafka version 1.1+.
	ACLResourceDelegationToken ACLResourceType = "DELEGATION_TOKEN"
)

type Alert

type Alert struct {
	// AlertID  is a unique identifier for the setting corresponding to this alert. See the available ids via `GetAlertSettings`.
	AlertID int `json:"alertId" yaml:"AlertID" header:"ID,text"`

	// Labels field is a list of key-value pairs. It must contain a non empty `Severity` value.
	Labels AlertLabels `json:"labels" yaml:"Labels" header:"inline"`
	// Annotations is a list of key-value pairs. It contains the summary, source, and docs fields.
	Annotations AlertAnnotations `json:"annotations" yaml:"Annotations"` // header:"inline"`
	// GeneratorURL is a unique URL identifying the creator of this alert.
	// It matches AlertManager requirements for providing this field.
	GeneratorURL string `json:"generatorURL" yaml:"GeneratorURL"` // header:"Gen URL"`

	// StartsAt is the time as string, in ISO format, for when the alert starts
	StartsAt string `json:"startsAt" yaml:"StartsAt" header:"Start,date"`
	// EndsAt is the time as string the alert ended at.
	EndsAt string `json:"endsAt" yaml:"EndsAt" header:"End,date"`
}

Alert is the request payload that is used to register an Alert via `RegisterAlert` and the response that client retrieves from the `GetAlerts`.

type AlertAnnotations

type AlertAnnotations struct {
	Summary string `json:"summary" yaml:"Summary" header:"Summary"`
	Source  string `json:"source,omitempty" yaml:"Source,omitempty" header:"Source,empty"`
	Docs    string `json:"docs,omitempty" yaml:"Docs,omitempty" header:"Docs,empty"`
}

AlertAnnotations annotations for the `Alert`, at least Summary should be filled.

type AlertHandler

type AlertHandler func(Alert) error

AlertHandler is the type of func that can be registered to receive alerts via the `GetAlertsLive`.

type AlertLabels

type AlertLabels struct {
	Category string `json:"category,omitempty" yaml:"Category,omitempty" header:"Category"`
	Severity string `json:"severity" yaml:"Severity,omitempty" header:"Severity"`
	Instance string `json:"instance,omitempty" yaml:"Instance,omitempty" header:"Instance"`
}

AlertLabels labels for the `Alert`, at least Severity should be filled.

type AlertSetting

type AlertSetting struct {
	ID                int               `json:"id" header:"ID,text"`
	Description       string            `json:"description" header:"Desc"`
	Category          string            `json:"category" header:"Category"`
	Enabled           bool              `json:"enabled" header:"Enabled"`
	IsAvailable       bool              `json:"isAvailable" header:"Available"`
	Docs              string            `json:"docs,omitempty" header:"Docs"`
	ConditionTemplate string            `json:"conditionTemplate,omitempty" header:"Cond Tmpl"`
	ConditionRegex    string            `json:"conditionRegex,omitempty" header:"Cond Regex"`
	Conditions        map[string]string `json:"conditions,omitempty" header:"Conds"`
}

AlertSetting describes the type of list entry of the `GetAlertSetting` and `CreateOrUpdateAlertSettingCondition`.

type AlertSettingConditions

type AlertSettingConditions map[string]string

AlertSettingConditions map with UUID as key and the condition as value, used on `GetAlertSettingConditions`.

type AlertSettings

type AlertSettings struct {
	Categories AlertSettingsCategoryMap `json:"categories" header:"inline"`
}

AlertSettings describes the type of list entry of the `GetAlertSettings`.

type AlertSettingsCategoryMap

type AlertSettingsCategoryMap struct {
	Infrastructure []AlertSetting `json:"Infrastructure" header:"Infrastructure"`
	Consumers      []AlertSetting `json:"Consumers" header:"Consumers"`
}

AlertSettingsCategoryMap describes the type of `AlertSetting`'s Categories.

type AuditEntry

type AuditEntry struct {
	Type      AuditEntryType    `json:"type" yaml:"Type" header:"Type"`
	Change    AuditEntryChange  `json:"change" yaml:"Change" header:"Change"`
	UserID    string            `json:"userId" yaml:"User" header:"User         "` /* make it a little bigger than expected, it looks slightly better for this field*/
	Timestamp int64             `json:"timestamp" yaml:"Timestamp" header:"Date,timestamp(ms|utc|02 Jan 2006 15:04)"`
	Content   map[string]string `json:"content" yaml:"Content" header:"Content"`
}

AuditEntry describes a lenses Audit Entry, used for audit logs API.

type AuditEntryChange

type AuditEntryChange string

AuditEntryChange the go type describer for the audit entry changes, see the `AuditEntry` structure for more.

const (
	AuditEntryAdd    AuditEntryChange = "ADD"
	AuditEntryRemove AuditEntryChange = "REMOVE"
	AuditEntryUpdate AuditEntryChange = "UPDATE"
	AuditEntryInsert AuditEntryChange = "INSERT"
)

The available audit entry changes. Available types: AuditEntryAdd, AuditEntryRemove, AuditEntryUpdate, AuditEntryInsert.

type AuditEntryHandler

type AuditEntryHandler func(AuditEntry) error

AuditEntryHandler is the type of the function, the listener which is the input parameter of the `GetAuditEntriesLive` API call.

type AuditEntryType

type AuditEntryType string

AuditEntryType the go type for audit entry types, see the `AuditEntry` structure for more.

const (
	AuditEntryTopic        AuditEntryType = "TOPIC"
	AuditEntryTopicData    AuditEntryType = "TOPIC_DATA"
	AuditEntryQuotas       AuditEntryType = "QUOTAS"
	AuditEntryBrokerConfig AuditEntryType = "BROKER_CONFIG"
	AuditEntryACL          AuditEntryType = "ACL"
	AuditEntrySchema       AuditEntryType = "SCHEMA"
	AuditEntryProcessor    AuditEntryType = "PROCESSOR"
	AuditEntryConnector    AuditEntryType = "CONNECTOR"
)

The available audit entry types. Available types: AuditEntryTopic, AuditEntryTopicData, AuditEntryQuotas, AuditEntryBrokerConfig, AuditEntryACL, AuditEntrySchema, AuditEntryProcessor, AuditEntryConnector.

type Authentication

type Authentication interface {
	// Auth accepts the current client and returns a not-nil error if authentication failed, otherwise
	// the authentication can alter the Client to do "something" before of each request.
	Auth(c *Client) error
}

Authentication is an interface which all authentication methods should implement.

See `BasicAuthentication` and `KerberosAuthentication` too.

type AuthenticationFunc

type AuthenticationFunc func(*Client) error

AuthenticationFunc implements the Authentication, it can be used for single-line custom authentication methods.

func (AuthenticationFunc) Auth

func (auth AuthenticationFunc) Auth(c *Client) error

Auth implements the `Authentication` interface, it just calls the func.

type BasicAuthentication

type BasicAuthentication struct {
	Username string `json:"username" yaml:"Username" survey:"username"`
	Password string `json:"password,omitempty" yaml:"Password" survey:"password"`
}

BasicAuthentication for Lenses, accepts raw username and password.

Use it when Lenses setup with "BASIC" or "LDAP" authentication.

func (BasicAuthentication) Auth

func (auth BasicAuthentication) Auth(c *Client) error

Auth implements the `Authentication` for the `BasicAuthentication`.

type BoxConfig

type BoxConfig struct {
	ConnectClusters []BoxConnectClusterConfigProperty `json:"lenses.connect.clusters"`

	Version string `json:"lenses.version" header:"Version"`
	IP      string `json:"lenses.ip" header:"IP"`
	Port    int    `json:"lenses.port" header:"Port,text"`
	JMXPort int    `json:"lenses.jmx.port,omitempty" header:"JMX Port,text"`

	KafkaBrokers string `json:"lenses.kafka.brokers"`

	KubernetesConfigFile     string `json:"lenses.kubernetes.config.file,omitempty"`
	KubernetesImageName      string `json:"lenses.kubernetes.image.name,omitempty" header:"K8 Image"`
	KubernetesImageTag       string `json:"lenses.kubernetes.image.tag,omitempty" header:"K8 Tag"`
	KubernetesServiceAccount string `json:"lenses.kubernetes.service.account,omitempty" header:"K8 Service Acc"`

	LicenseFile        string                 `json:"lenses.license.file"`
	RootPath           string                 `json:"lenses.root.path,omitempty"`
	SchemaRegistryURLs []BoxURLConfigProperty `json:"lenses.schema.registry.urls"`
	SecurityMode       string                 `json:"lenses.security.mode" header:"Security Mode"`
	SQLExecutionMode   ExecutionMode          `json:"lenses.sql.execution.mode" header:"SQL Execution Mode"`

	TopicsAlertsSettings   string `json:"lenses.topics.alerts.settings"`
	TopicsAlertsStorage    string `json:"lenses.topics.alerts.storage"`
	TopicsAudits           string `json:"lenses.topics.audits"`
	TopicsCluster          string `json:"lenses.topics.cluster"`
	TopicsExternalMetrics  string `json:"lenses.topics.external.metrics"`
	TopicsExternalTopology string `json:"lenses.topics.external.topology"`
	TopicsLSQLStorage      string `json:"lenses.topics.lsql.storage"`
	TopicsMetadata         string `json:"lenses.topics.metadata"`
	TopicsMetrics          string `json:"lenses.topics.metrics"`
	TopicsProcessors       string `json:"lenses.topics.processors"`
	TopicsProfiles         string `json:"lenses.topics.profiles"`

	ZookeeperChroot string                 `json:"lenses.zookeeper.chroot"`
	ZookeeperHosts  []BoxURLConfigProperty `json:"lenses.zookeeper.hosts"`
}

BoxConfig contains the structure for the lenses box configuration, see the `GetConfig` call.

Note that this structure contains only the properties that are exposed via the API's response data.

type BoxConnectClusterConfigProperty

type BoxConnectClusterConfigProperty struct {
	Configs  string                 `json:"configs"`
	Name     string                 `json:"name"`
	Offsets  string                 `json:"offsets"`
	Statuses string                 `json:"statuses"`
	URLs     []BoxURLConfigProperty `json:"urls"`
}

BoxConnectClusterConfigProperty the Box Config's embedded configuration for the Connect Clusters.

type BoxURLConfigProperty

type BoxURLConfigProperty struct {
	JMX string `json:"jmx"`
	URL string `json:"url"`
}

BoxURLConfigProperty used on the BoxConfig to describe the urls.

type BrokerConfig

type BrokerConfig struct {
	LogCleanerThreads int             `json:"log.cleaner.threads" yaml:"LogCleanerThreads" header:"Log Cleaner Threads"`
	CompressionType   CompressionType `json:"compression.type" yaml:"CompressionType" header:"Compression Type"`
	AdvertisedPort    int             `json:"advertised.port" yaml:"AdvertisedPort" header:"Advertised Port"`
}

BrokerConfig describes the kafka broker's configurations.

type Client

type Client struct {
	Config *ClientConfig

	// PersistentRequestModifier can be used to modify the *http.Request before send it to the backend.
	PersistentRequestModifier RequestOption

	// Progress                  func(current, total int64)
	// User is generated on `lenses#OpenConnection` function based on the `Config#Authentication`.
	User User
	// contains filtered or unexported fields
}

Client is the lenses http client. It contains the necessary API calls to communicate and develop via lenses.

func OpenConnection

func OpenConnection(cfg ClientConfig, options ...ConnectionOption) (*Client, error)

OpenConnection creates & returns a new Landoop's Lenses API bridge interface based on the passed `ClientConfig` and the (optional) options. OpenConnection authenticates the user and returns a valid ready-to-use `*lenses.Client`. If failed to communicate with the server then it returns a nil client and a non-nil error.

Usage: auth := lenses.BasicAuthentication{Username: "user", Password: "pass"} config := lenses.ClientConfig{Host: "domain.com", Authentication: auth, Timeout: "15s"} client, err := lenses.OpenConnection(config) // or (config, lenses.UsingClient/UsingToken) if err != nil { panic(err) } client.DeleteTopic("topicName")

Read more by navigating to the `Client` type documentation.

func (*Client) CancelQuery

func (c *Client) CancelQuery(id int64) (bool, error)

CancelQuery stops a running query based on its ID. It returns true whether it was cancelled otherwise false or/and error.

func (*Client) CreateConnector

func (c *Client) CreateConnector(clusterName, name string, config ConnectorConfig) (connector Connector, err error)

CreateConnector creates a new connector. It returns the current connector info if successful.

name (string) – Name of the connector to create config (map) – Config parameters for the connector. All values should be strings.

Read more at: https://docs.confluent.io/current/connect/restapi.html#post--connectors

Look `UpdateConnector` too.

func (*Client) CreateOrUpdateACL

func (c *Client) CreateOrUpdateACL(acl ACL) error

CreateOrUpdateACL sets an Apache Kafka Access Control List. Use the defined types when needed, example: `client.CreateOrUpdateACL(lenses.ACL{lenses.ACLResourceTopic, "transactions", "principalType:principalName", lenses.ACLPermissionAllow, "*", lenses.OpRead})`

Note that on the "host" input argument you should use IP addresses as domain names are not supported at the moment by Apache Kafka.

func (*Client) CreateOrUpdateAlertSettingCondition

func (c *Client) CreateOrUpdateAlertSettingCondition(alertSettingID int, condition string) error

CreateOrUpdateAlertSettingCondition sets a condition(expression text) for a specific alert setting.

func (*Client) CreateOrUpdateQuotaForAllClients

func (c *Client) CreateOrUpdateQuotaForAllClients(config QuotaConfig) error

CreateOrUpdateQuotaForAllClients sets the default quota for all clients. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateQuotaForAllUsers

func (c *Client) CreateOrUpdateQuotaForAllUsers(config QuotaConfig) error

CreateOrUpdateQuotaForAllUsers sets the default quota for all users. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateQuotaForClient

func (c *Client) CreateOrUpdateQuotaForClient(clientID string, config QuotaConfig) error

CreateOrUpdateQuotaForClient sets the quota for a specific client. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateQuotaForUser

func (c *Client) CreateOrUpdateQuotaForUser(user string, config QuotaConfig) error

CreateOrUpdateQuotaForUser sets a quota for a user. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateQuotaForUserAllClients

func (c *Client) CreateOrUpdateQuotaForUserAllClients(user string, config QuotaConfig) error

CreateOrUpdateQuotaForUserAllClients sets a quota for a user for all clients. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateQuotaForUserClient

func (c *Client) CreateOrUpdateQuotaForUserClient(user, clientID string, config QuotaConfig) error

CreateOrUpdateQuotaForUserClient sets the quota for a user/client pair. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

func (*Client) CreateOrUpdateTopicMetadata

func (c *Client) CreateOrUpdateTopicMetadata(metadata TopicMetadata) error

CreateOrUpdateTopicMetadata adds or updates an existing topic metadata.

func (*Client) CreateProcessor

func (c *Client) CreateProcessor(name string, sql string, runners int, clusterName, namespace, pipeline string) error

CreateProcessor creates a new LSQL processor.

func (*Client) CreateTopic

func (c *Client) CreateTopic(topicName string, replication, partitions int, configs KV) error

CreateTopic creates a topic.

topicName, string, Required. replication, int. partitions, int. configs, topic key - value.

Read more at: http://lenses.stream/dev/lenses-apis/rest-api/index.html#create-topic

func (*Client) CreateUserProfilePropertyValue

func (c *Client) CreateUserProfilePropertyValue(property, value string) error

CreateUserProfilePropertyValue adds a "value" to the user profile "property" entries.

func (*Client) DeleteACL

func (c *Client) DeleteACL(acl ACL) error

DeleteACL deletes an existing Apache Kafka Access Control List.

func (*Client) DeleteAlertSettingCondition

func (c *Client) DeleteAlertSettingCondition(alertSettingID int, conditionUUID string) error

DeleteAlertSettingCondition deletes a condition from an alert setting.

func (*Client) DeleteConnector

func (c *Client) DeleteConnector(clusterName, name string) error

DeleteConnector deletes a connector, halting all tasks and deleting its configuration. It return a 409 (Conflict) status code error if rebalance is in process.

func (*Client) DeleteDynamicBrokerConfigs

func (c *Client) DeleteDynamicBrokerConfigs(brokerID int, configKeysToBeReseted ...string) error

DeleteDynamicBrokerConfigs deletes a configuration for a broker. Deleting a configuration dynamically reverts it to its default value.

func (*Client) DeleteDynamicClusterConfigs

func (c *Client) DeleteDynamicClusterConfigs(configKeysToBeReseted ...string) error

DeleteDynamicClusterConfigs deletes cluster configuration(s) dynamically. It reverts the configuration to its default value.

func (*Client) DeleteLatestSubjectVersion

func (c *Client) DeleteLatestSubjectVersion(subject string) (int, error)

DeleteLatestSubjectVersion deletes the latest version of the schema registered under this subject. This only deletes the version and the schema id remains intact making it still possible to decode data using the schema id. This API is recommended to be used only in development environments or under extreme circumstances where-in, its required to delete a previously registered schema for compatibility purposes or re-register previously registered schema.

subject (string) – Name of the subject.

It returns the version (as number) of the deleted schema.

See `DeleteSubjectVersion` too.

func (*Client) DeleteProcessor

func (c *Client) DeleteProcessor(processorNameOrID string) error

DeleteProcessor removes a processor based on its name or the full id, it depends on lenses execution mode, use the `LookupProcessorIdentifier`.

func (*Client) DeleteQuotaForAllClients

func (c *Client) DeleteQuotaForAllClients(propertiesToRemove ...string) error

DeleteQuotaForAllClients deletes the default quota for all clients.

if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteQuotaForAllUsers

func (c *Client) DeleteQuotaForAllUsers(propertiesToRemove ...string) error

DeleteQuotaForAllUsers deletes the default for all users. Read more at: http://lenses.stream/using-lenses/user-guide/quotas.html.

if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteQuotaForClient

func (c *Client) DeleteQuotaForClient(clientID string, propertiesToRemove ...string) error

DeleteQuotaForClient deletes quotas for a client id.

if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteQuotaForUser

func (c *Client) DeleteQuotaForUser(user string, propertiesToRemove ...string) error

DeleteQuotaForUser deletes a quota for a user. if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteQuotaForUserAllClients

func (c *Client) DeleteQuotaForUserAllClients(user string, propertiesToRemove ...string) error

DeleteQuotaForUserAllClients deletes for all client ids for a user.

if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteQuotaForUserClient

func (c *Client) DeleteQuotaForUserClient(user, clientID string, propertiesToRemove ...string) error

DeleteQuotaForUserClient deletes the quota for a user/client pair.

if "propertiesToRemove" is not passed or empty then the client will send all the available keys to be removed, see `DefaultQuotaConfigPropertiesToRemove` for more.

func (*Client) DeleteSubject

func (c *Client) DeleteSubject(subject string) (versions []int, err error)

DeleteSubject deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environment. Returns the versions of the schema deleted under this subject.

func (*Client) DeleteSubjectVersion

func (c *Client) DeleteSubjectVersion(subject string, versionID int) (int, error)

DeleteSubjectVersion deletes a specific version of the schema registered under this subject. This only deletes the version and the schema id remains intact making it still possible to decode data using the schema id. This API is recommended to be used only in development environments or under extreme circumstances where-in, its required to delete a previously registered schema for compatibility purposes or re-register previously registered schema.

subject (string) – Name of the subject. version (versionId) – Version of the schema to be deleted.

Valid values for versionID are between [1,2^31-1]. It returns the version (as number) of the deleted schema.

See `DeleteLatestSubjectVersion` too.

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(topicName string) error

DeleteTopic deletes a topic. It accepts the topicName, a required, not empty string.

Read more at: http://lenses.stream/dev/lenses-apis/rest-api/index.html#delete-topic

func (*Client) DeleteTopicMetadata

func (c *Client) DeleteTopicMetadata(topicName string) error

DeleteTopicMetadata removes an existing topic metadata.

func (*Client) DeleteTopicRecords

func (c *Client) DeleteTopicRecords(topicName string, fromPartition int, toOffset int64) error

DeleteTopicRecords deletes a topic's records from partition to an offset. If user has no rights for that action it returns `ErrResourceNotAccessible`, if negative value of "toOffset" then it returns `ErrResourceNotGood`.

All input arguments are required.

func (*Client) DeleteUserProfilePropertyValue

func (c *Client) DeleteUserProfilePropertyValue(property, value string) error

DeleteUserProfilePropertyValue removes the "value" from the user profile "property" entries.

func (*Client) Do

func (c *Client) Do(method, path, contentType string, send []byte, options ...RequestOption) (*http.Response, error)

Do is the lower level of a client call, manually sends an HTTP request to the lenses box backend based on the `Client#Config` and returns an HTTP response.

func (*Client) EnableAlertSetting

func (c *Client) EnableAlertSetting(id int) error

EnableAlertSetting enables a specific alert setting based on its "id".

func (*Client) GetACLs

func (c *Client) GetACLs() ([]ACL, error)

GetACLs returns all the available Apache Kafka Access Control Lists.

func (*Client) GetAccessToken

func (c *Client) GetAccessToken() string

GetAccessToken returns the access token that generated from the `OpenConnection` or given by the configuration.

func (*Client) GetAlertSetting

func (c *Client) GetAlertSetting(id int) (setting AlertSetting, err error)

GetAlertSetting returns a specific alert setting based on its "id".

func (*Client) GetAlertSettingConditions

func (c *Client) GetAlertSettingConditions(id int) (AlertSettingConditions, error)

GetAlertSettingConditions returns alert setting's conditions as a map of strings.

func (*Client) GetAlertSettings

func (c *Client) GetAlertSettings() (AlertSettings, error)

GetAlertSettings returns all the configured alert settings. Alerts are divided into two categories:

* Infrastructure - These are out of the box alerts that be toggled on and offset. * Consumer group - These are user-defined alerts on consumer groups.

Alert notifications are the result of an `AlertSetting` Condition being met on an `AlertSetting`.

func (*Client) GetAlerts

func (c *Client) GetAlerts() (alerts []Alert, err error)

GetAlerts returns the registered alerts.

func (*Client) GetAlertsLive

func (c *Client) GetAlertsLive(handler AlertHandler) error

GetAlertsLive receives alert notifications in real-time from the server via a Send Server Event endpoint.

func (*Client) GetAuditEntries

func (c *Client) GetAuditEntries() (entries []AuditEntry, err error)

GetAuditEntries returns the last buffered audit entries.

Retrives the last N audit entries created. See `GetAuditEntriesLive` for real-time notifications.

func (*Client) GetAuditEntriesLive

func (c *Client) GetAuditEntriesLive(handler AuditEntryHandler) error

GetAuditEntriesLive returns the live audit notifications, see `GetAuditEntries` too.

func (*Client) GetAvailableTopicConfigKeys

func (c *Client) GetAvailableTopicConfigKeys() ([]string, error)

GetAvailableTopicConfigKeys retrieves a list of available configs for topics.

func (*Client) GetConfig

func (c *Client) GetConfig() (cfg BoxConfig, err error)

GetConfig returns the whole configuration of the lenses box, which can be changed from box to box and it's read-only.

It returns a `BoxConfig`.

If you just need to retrieve the execution mode of the box use the `GetExecutionMode` instead.

func (*Client) GetConfigEntry

func (c *Client) GetConfigEntry(outPtr interface{}, keys ...string) error

GetConfigEntry reads the lenses back-end configuration and sets the value of a key, based on "keys", to the "outPtr".

func (*Client) GetConnectClusters

func (c *Client) GetConnectClusters() (clusters []ConnectCluster, err error)

GetConnectClusters returns the `lenses.connect.clusters` key from the lenses configuration (`GetConfig`).

func (*Client) GetConnector

func (c *Client) GetConnector(clusterName, name string) (connector Connector, err error)

GetConnector returns the information about the connector. See `Connector` type and read more at: https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)

func (*Client) GetConnectorConfig

func (c *Client) GetConnectorConfig(clusterName, name string) (cfg ConnectorConfig, err error)

GetConnectorConfig returns the configuration for the connector.

func (*Client) GetConnectorPlugins

func (c *Client) GetConnectorPlugins(clusterName string) (cp []ConnectorPlugin, err error)

GetConnectorPlugins returns a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means it is possible to see inconsistent results, especially during a rolling upgrade if you add new connector jars.

func (*Client) GetConnectorStatus

func (c *Client) GetConnectorStatus(clusterName, name string) (cs ConnectorStatus, err error)

GetConnectorStatus returns the current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks.

func (*Client) GetConnectorTaskStatus

func (c *Client) GetConnectorTaskStatus(clusterName, name string, taskID int) (cst ConnectorStatusTask, err error)

GetConnectorTaskStatus returns a task’s status.

func (*Client) GetConnectorTasks

func (c *Client) GetConnectorTasks(clusterName, name string) (m []map[string]interface{}, err error)

GetConnectorTasks returns a list of tasks currently running for the connector. Read more at: https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-tasks.

func (*Client) GetConnectors

func (c *Client) GetConnectors(clusterName string) (names []string, err error)

GetConnectors returns a list of active connectors names as list of strings.

Visit http://lenses.stream/dev/lenses-apis/rest-api/index.html#connector-api and https://docs.confluent.io/current/connect/restapi.html for a deeper understanding.

func (*Client) GetDynamicBrokerConfigs

func (c *Client) GetDynamicBrokerConfigs(brokerID int) (config BrokerConfig, err error)

GetDynamicBrokerConfigs returns the dynamic updated configurations for a kafka broker. Retrieves only the ones added/updated dynamically.

func (*Client) GetDynamicClusterConfigs

func (c *Client) GetDynamicClusterConfigs() (configs BrokerConfig, err error)

GetDynamicClusterConfigs returns the dynamic updated configurations for a kafka cluster. Retrieves only the ones added/updated dynamically.

func (*Client) GetExecutionMode

func (c *Client) GetExecutionMode() (ExecutionMode, error)

GetExecutionMode returns the execution mode, if not error returned then the possible values are: `ExecutionModeInProc`, `ExecutionModeConnect` or `ExecutionModeKubernetes`.

func (*Client) GetGlobalCompatibilityLevel

func (c *Client) GetGlobalCompatibilityLevel() (level CompatibilityLevel, err error)

GetGlobalCompatibilityLevel returns the global compatibility level, "NONE", "FULL", "FORWARD" or "BACKWARD", as described at the `CompatibilityLevel` type.

func (*Client) GetLatestSchema

func (c *Client) GetLatestSchema(subject string) (Schema, error)

GetLatestSchema returns the latest version of a schema. See `GetSchemaAtVersion` to retrieve a subject schema by a specific version.

func (*Client) GetLicenseInfo

func (c *Client) GetLicenseInfo() (LicenseInfo, error)

GetLicenseInfo returns the license information for the connected lenses box.

func (*Client) GetLogsInfo

func (c *Client) GetLogsInfo() ([]LogLine, error)

GetLogsInfo returns the latest (512) INFO log lines.

func (*Client) GetLogsMetrics

func (c *Client) GetLogsMetrics() ([]LogLine, error)

GetLogsMetrics returns the latest (512) METRICS log lines.

func (*Client) GetProcessors

func (c *Client) GetProcessors() (ProcessorsResult, error)

GetProcessors returns a list of all available LSQL processors.

func (*Client) GetProcessorsLogs

func (c *Client) GetProcessorsLogs(clusterName, ns, podName string, follow bool, lines int, handler func(level string, log string) error) error

GetProcessorsLogs retrieves the LSQL processor logs if in kubernetes mode.

func (*Client) GetQuotas

func (c *Client) GetQuotas() ([]Quota, error)

GetQuotas returns a list of all available quotas.

func (*Client) GetRunningQueries

func (c *Client) GetRunningQueries() ([]LSQLRunningQuery, error)

GetRunningQueries returns a list of the current sql running queries.

func (*Client) GetSchema

func (c *Client) GetSchema(subjectID int) (string, error)

GetSchema returns the Auro schema string identified by the id. id (int) – the globally unique identifier of the schema.

func (*Client) GetSchemaAtVersion

func (c *Client) GetSchemaAtVersion(subject string, versionID int) (Schema, error)

GetSchemaAtVersion returns a specific version of a schema. See `GetLatestSchema` to retrieve the latest schema.

func (*Client) GetSubjectCompatibilityLevel

func (c *Client) GetSubjectCompatibilityLevel(subject string) (level CompatibilityLevel, err error)

GetSubjectCompatibilityLevel returns the compatibility level of a specific subject(schema) name.

func (*Client) GetSubjectVersions

func (c *Client) GetSubjectVersions(subject string) (versions []int, err error)

GetSubjectVersions returns all the versions of a subject(schema) based on its name.

func (*Client) GetSubjects

func (c *Client) GetSubjects() (subjects []string, err error)

GetSubjects returns a list of the available subjects(schemas). https://docs.confluent.io/current/schema-registry/docs/api.html#subjects

func (*Client) GetSupportedConnectors

func (c *Client) GetSupportedConnectors() ([]ConnectorInfoUI, error)

GetSupportedConnectors returns the list of the supported Kafka Connectors.

func (*Client) GetTopic

func (c *Client) GetTopic(topicName string) (topic Topic, err error)

GetTopic returns a topic's information, a `lenses.Topic` value.

Read more at: http://lenses.stream/dev/lenses-apis/rest-api/index.html#get-topic-information

func (*Client) GetTopicMetadata

func (c *Client) GetTopicMetadata(topicName string) (TopicMetadata, error)

GetTopicMetadata retrieves and returns a topic's metadata.

func (*Client) GetTopics

func (c *Client) GetTopics() (topics []Topic, err error)

GetTopics returns the list of topics.

func (*Client) GetTopicsMetadata

func (c *Client) GetTopicsMetadata() ([]TopicMetadata, error)

GetTopicsMetadata retrieves and returns all the topics' available metadata.

func (*Client) GetTopicsNames

func (c *Client) GetTopicsNames() ([]string, error)

GetTopicsNames returns the list of topics' names.

func (*Client) GetUserProfile

func (c *Client) GetUserProfile() (UserProfile, error)

GetUserProfile returns the user-specific favourites.

func (*Client) LSQL

func (c *Client) LSQL(
	sql string, withOffsets bool, statsEvery time.Duration,
	recordHandler LSQLRecordHandler,
	stopHandler LSQLStopHandler,
	stopErrHandler LSQLStopErrorHandler,
	statsHandler LSQLStatsHandler) error

LSQL runs a lenses query and fires the necessary handlers given by the caller. Example: err := client.LSQL("SELECT * FROM reddit_posts LIMIT 50", true, 2 * time.Second, recordHandler, stopHandler, stopErrHandler, statsHandler)

func (*Client) LSQLWait

func (c *Client) LSQLWait(sql string, withOffsets bool, statsEvery time.Duration) (records []LSQLRecord, stats LSQLStats, stop LSQLStop, err error)

LSQLWait same as `LSQL` but waits until stop or error to return the query's results records, the stats and the stop information.

func (*Client) Logout

func (c *Client) Logout() error

Logout invalidates the token and revoke its access. A new Client, using `OpenConnection`, should be created in order to continue after this call.

func (*Client) LookupProcessorIdentifier

func (c *Client) LookupProcessorIdentifier(id, name, clusterName, namespace string) (string, error)

LookupProcessorIdentifier is not a direct API call, although it fires requests to get the result. It's a helper which can be used as an input argument of the `DeleteProcessor` and `PauseProcessor` and `ResumeProcessor` and `UpdateProcessorRunners` functions.

Fill the id or name in any case. Fill the clusterName and namespace when in KUBERNETES execution mode.

func (*Client) PauseConnector

func (c *Client) PauseConnector(clusterName, name string) error

PauseConnector pauses the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time.

func (*Client) PauseProcessor

func (c *Client) PauseProcessor(processorID string) error

PauseProcessor pauses a processor. See `LookupProcessorIdentifier`.

func (*Client) ReadJSON

func (c *Client) ReadJSON(resp *http.Response, valuePtr interface{}) error

ReadJSON is one of the lower-level methods of the client to read the result of a `Client#Do`, it closes the body stream.

See `ReadResponseBody` lower-level of method to read a response for more details.

func (*Client) ReadResponseBody

func (c *Client) ReadResponseBody(resp *http.Response) ([]byte, error)

ReadResponseBody is the lower-level method of client to read the result of a `Client#Do`, it closes the body stream.

See `ReadJSON` too.

func (*Client) RegisterAlert

func (c *Client) RegisterAlert(alert Alert) error

RegisterAlert registers an Alert, returns an error on failure.

func (*Client) RegisterSchema

func (c *Client) RegisterSchema(subject string, avroSchema string) (int, error)

RegisterSchema registers a schema. The returned identifier should be used to retrieve this schema from the schemas resource and is different from the schema’s version which is associated with that name.

func (*Client) RestartConnector

func (c *Client) RestartConnector(clusterName, name string) error

RestartConnector restarts the connector and its tasks. It returns a 409 (Conflict) status code error if rebalance is in process.

func (*Client) RestartConnectorTask

func (c *Client) RestartConnectorTask(clusterName, name string, taskID int) error

RestartConnectorTask restarts an individual task.

func (*Client) ResumeConnector

func (c *Client) ResumeConnector(clusterName, name string) error

ResumeConnector resumes a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNING state at the same time.

func (*Client) ResumeProcessor

func (c *Client) ResumeProcessor(processorID string) error

ResumeProcessor resumes a processor. See `LookupProcessorIdentifier`.

func (*Client) UpdateConnector

func (c *Client) UpdateConnector(clusterName, name string, config ConnectorConfig) (connector Connector, err error)

UpdateConnector sets the configuration of an existing connector.

It returns information about the connector after the change has been made and an indicator if that connector was created or just configuration update.

func (*Client) UpdateDynamicBrokerConfigs

func (c *Client) UpdateDynamicBrokerConfigs(brokerID int, toAddOrUpdate BrokerConfig) error

UpdateDynamicBrokerConfigs adds or updates broker configuration dynamically.

func (*Client) UpdateDynamicClusterConfigs

func (c *Client) UpdateDynamicClusterConfigs(toAddOrUpdate BrokerConfig) error

UpdateDynamicClusterConfigs adds or updates cluster configuration dynamically.

func (*Client) UpdateGlobalCompatibilityLevel

func (c *Client) UpdateGlobalCompatibilityLevel(level CompatibilityLevel) error

UpdateGlobalCompatibilityLevel sets a new global compatibility level. When there are multiple instances of schema registry running in the same cluster, the update request will be forwarded to one of the instances designated as the master. If the master is not available, the client will get an error code indicating that the forwarding has failed.

func (*Client) UpdateProcessorRunners

func (c *Client) UpdateProcessorRunners(processorID string, numberOfRunners int) error

UpdateProcessorRunners scales a processor to "numberOfRunners". See `LookupProcessorIdentifier`.

func (*Client) UpdateSubjectCompatibilityLevel

func (c *Client) UpdateSubjectCompatibilityLevel(subject string, level CompatibilityLevel) error

UpdateSubjectCompatibilityLevel modifies a specific subject(schema)'s compatibility level.

func (*Client) UpdateTopic

func (c *Client) UpdateTopic(topicName string, configsSlice []KV) error

UpdateTopic updates a topic's configuration. topicName, string. configsSlice, array of topic config key-values.

Read more at: http://lenses.stream/dev/lenses-apis/rest-api/index.html#update-topic-configuration

func (*Client) ValidateLSQL

func (c *Client) ValidateLSQL(sql string) (v LSQLValidation, err error)

ValidateLSQL validates but not executes a specific LSQL.

type ClientConfig

type ClientConfig struct {
	// Host is the network shema  address and port that your lenses backend box is listening on.
	Host string `json:"host" yaml:"Host" survey:"host"`

	// Authentication, in order to gain access using different kind of options.
	//
	// See `BasicAuthentication` and `KerberosAuthentication` or the example for more.
	Authentication Authentication `json:"-" yaml:"-" survey:"-"`

	// Token is the "X-Kafka-Lenses-Token" request header's value.
	// If not empty, overrides any `Authentication` settings.
	//
	// If `Token` is expired then all the calls will result on 403 forbidden error HTTP code
	// and a manual renewal will be demanded.
	//
	// For general-purpose usecase the recommendation is to let this field empty and
	// fill the `Authentication` field instead.
	Token string `json:"token,omitempty" yaml:"Token,omitempty" survey:"-"`

	// Timeout specifies the timeout for connection establishment.
	//
	// Empty timeout value means no timeout.
	//
	// Such as "300ms", "-1.5h" or "2h45m".
	// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
	// Example: "5s" for 5 seconds, "5m" for 5 minutes and so on.
	Timeout string `json:"timeout,omitempty" yaml:"Timeout,omitempty" survey:"timeout"`

	// Insecure tells the client to connect even if the cert is invalid.
	// Turn that to true if you get errors about invalid certifications for the specific host domain.
	//
	// Defaults to false.
	Insecure bool `json:"insecure,omitempty" yaml:"Insecure,omitempty" survey:"insecure"`
	// Debug activates the debug mode, it logs every request, the configuration (except the `Password`)
	// and its raw response before decoded but after gzip reading.
	//
	// If this is enabled then printer's contents are not predicted to the end-user.
	// The output source is always the `os.Stdout` which 99.9% of the times means the terminal,
	// so use it only for debugging.
	//
	//
	// Defaults to false.
	Debug bool `json:"debug,omitempty" yaml:"Debug,omitempty" survey:"debug"`
}

ClientConfig contains the necessary information to a client to connect to the lenses backend box.

func (*ClientConfig) Fill

func (c *ClientConfig) Fill(other ClientConfig) bool

Fill iterates over the "other" ClientConfig's fields it checks if a field is not empty, if it's then it sets the value to the "c" ClientConfig's particular field.

It returns true if the final configuration is valid by calling the `IsValid`.

func (*ClientConfig) FormatHost

func (c *ClientConfig) FormatHost()

FormatHost will try to make sure that the schema:host:port pattern is followed on the `Host` field.

func (*ClientConfig) IsBasicAuth

func (c *ClientConfig) IsBasicAuth() (BasicAuthentication, bool)

IsBasicAuth reports whether the authentication is basic.

func (*ClientConfig) IsKerberosAuth

func (c *ClientConfig) IsKerberosAuth() (KerberosAuthentication, bool)

IsKerberosAuth reports whether the authentication is kerberos-based.

func (*ClientConfig) IsValid

func (c *ClientConfig) IsValid() bool

IsValid returns true if the configuration contains the necessary fields, otherwise false.

type CompatibilityLevel

type CompatibilityLevel string

CompatibilityLevel describes the valid compatibility levels' type, it's just a string. Valid values are: `CompatibilityLevelNone`, `CompatibilityLevelFull`, `CompatibilityLevelForward`, `CompatibilityLevelBackward` `CompatibilityLevelFullTransitive`, `CompatibilityLevelForwardTransitive`, `CompatibilityLevelBackwardTransitive`.

Read https://docs.confluent.io/current/schema-registry/docs/api.html#compatibility for more.

const (
	// CompatibilityLevelNone is the "NONE" compatibility level.
	CompatibilityLevelNone CompatibilityLevel = "NONE"
	// CompatibilityLevelFull is the "FULL" compatibility level.
	CompatibilityLevelFull CompatibilityLevel = "FULL"
	// CompatibilityLevelForward is the "FORWARD" compatibility level.
	CompatibilityLevelForward CompatibilityLevel = "FORWARD"
	// CompatibilityLevelBackward is the "BACKWARD" compatibility level.
	CompatibilityLevelBackward CompatibilityLevel = "BACKWARD"
	// CompatibilityLevelFullTransitive is the "FULL_TRANSITIVE" compatibility level.
	CompatibilityLevelFullTransitive CompatibilityLevel = "FULL_TRANSITIVE"
	// CompatibilityLevelForwardTransitive is the "FORWARD_TRANSITIVE" compatibility level.
	CompatibilityLevelForwardTransitive CompatibilityLevel = "FORWARD_TRANSITIVE"
	// CompatibilityLevelBackwardTransitive is the "BACKWARD_TRANSITIVE" compatibility level.
	CompatibilityLevelBackwardTransitive CompatibilityLevel = "BACKWARD_TRANSITIVE"
)

type CompressionType

type CompressionType string

CompressionType is the go type to safety describe and set the topic config's and broker's config `CompressionType` field. The available compression types are: `Uncompressed`, `Snappy`, `LZ4`, `Gzip` and `Producer`.

const (
	Uncompressed CompressionType = "uncompressed"
	Snappy       CompressionType = "snappy"
	LZ4          CompressionType = "lz4"
	Gzip         CompressionType = "gzip"
	Producer     CompressionType = "producer"
)

The available compression types for topics configs and broker's config.

type Config

type Config struct {
	CurrentContext string
	Contexts       map[string]*ClientConfig
}

Config contains the necessary information that `OpenConnection` needs to create a new client which connects and talks to the lenses backend box.

Optionally, the `Contexts` map of string and client configuration values can be filled to map different environments. Use of `WithContext` `ConnectionOption` to select a specific `ClientConfig`, otherwise the first one is selected, this will also amend the `CurrentContext` via the top-level `OpenConnection` function.

Config can be loaded via JSON or YAML.

func (*Config) Clone

func (c *Config) Clone() Config

Clone will returns a deep clone of the this `Config`.

func (*Config) CurrentContextExists

func (c *Config) CurrentContextExists() bool

CurrentContextExists just checks if the `CurrentContext` exists in the `Contexts` map.

func (*Config) FillCurrent

func (c *Config) FillCurrent(cfg ClientConfig)

FillCurrent fills the specific client configuration based on the `CurrentContext` if it's valid.

func (*Config) GetCurrent

func (c *Config) GetCurrent() *ClientConfig

GetCurrent returns the specific current client configuration based on the `CurrentContext`.

func (*Config) IsValid

func (c *Config) IsValid() bool

IsValid returns the result of the contexts' ClientConfig#IsValid.

func (*Config) RemoveContext

func (c *Config) RemoveContext(contextName string) bool

RemoveContext deletes a context based on its name/key. It will change if there is an available context to set as current, if can't find then the operation stops. Returns true if found and removed and can change to something valid, otherwise false.

func (*Config) RemoveTokens

func (c *Config) RemoveTokens()

RemoveTokens removes the `Token` from all client configurations.

func (*Config) SetCurrent

func (c *Config) SetCurrent(currentContextName string)

SetCurrent overrides the `CurrentContext`, just this.

type ConnectCluster

type ConnectCluster struct {
	Name     string `json:"name" header:"Name"`
	URL      string `json:"url"` //header:"URL"`
	Statuses string `json:"statuses" header:"Status"`
	Config   string `json:"config" header:"Config"`
	Offsets  string `json:"offsets" header:"Offsets,count"`
}

ConnectCluster contains the connect cluster information that is returned by the `GetConnectClusters` call.

type ConnectionOption

type ConnectionOption func(*Client)

ConnectionOption describes an optional runtime configurator that can be passed on `OpenConnection`. Custom `ConnectionOption` can be used as well, it's just a type of `func(*lenses.Client)`.

Look `UsingClient` and `UsingToken` for use-cases.

func UsingClient

func UsingClient(httpClient *http.Client) ConnectionOption

UsingClient modifies the underline HTTP Client that lenses is using for contact with the backend server.

func UsingToken

func UsingToken(tok string) ConnectionOption

UsingToken can specify a custom token that can by-pass the "user" and "password". It may be useful for testing purposes.

func WithContext

func WithContext(contextName string) ConnectionOption

WithContext sets the current context, the environment to load configuration from.

See the `Config` structure and the `OpenConnection` function for more.

type Connector

type Connector struct {
	// https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)
	// Name of the created (or received) connector.
	ClusterName string `json:"clusterName,omitempty" header:"Cluster"` // internal use only, not set by response.
	Name        string `json:"name" header:"Name"`

	// Config parameters for the connector
	Config ConnectorConfig `json:"config,omitempty" header:"Configs,count"`
	// Tasks is the list of active tasks generated by the connector.
	Tasks []ConnectorTaskReadOnly `json:"tasks,omitempty" header:"Tasks,count"`
}

Connector contains the connector's information, both send and receive.

type ConnectorConfig

type ConnectorConfig map[string]interface{}

ConnectorConfig the configuration parameters for the connector.

For both send and receive: https://docs.confluent.io/current/connect/restapi.html#post--connectors

type ConnectorInfoUI

type ConnectorInfoUI struct {
	Class       string `json:"class"` // header:"Class"`
	Name        string `json:"name" header:"Name"`
	Type        string `json:"type" header:"Type"`
	Version     string `json:"version" header:"Version"`
	Author      string `json:"author,omitempty" header:"Author"`
	Description string `json:"description,omitempty" header:"Desc"`
	Docs        string `json:"docs,omitempty"` // header:"Docs"`
	UIEnabled   bool   `json:"uiEnabled" header:"UI Enabled"`
}

ConnectorInfoUI describes a supported Kafka Connector, result type of the `GetSupportedConnectors` call.

type ConnectorPlugin

type ConnectorPlugin struct {
	// Class is the connector class name.
	Class string `json:"class" header:"Class"`

	Type string `json:"type" header:"Type"`

	Version string `json:"version" header:"Version"`
}

ConnectorPlugin describes the entry data of the list that are being received from the `GetConnectorPlugins`.

type ConnectorState

type ConnectorState string

ConnectorState indicates the connector status task's state and connector's state. As defined at: https://docs.confluent.io/current/connect/managing.html#connector-and-task-status

const (
	// UNASSIGNED state indicates that the connector/task has not yet been assigned to a worker.
	UNASSIGNED ConnectorState = "UNASSIGNED"
	// RUNNING state indicates that the connector/task is running.
	RUNNING ConnectorState = "RUNNING"
	// PAUSED state indicates that the connector/task has been administratively paused.
	PAUSED ConnectorState = "PAUSED"
	// FAILED state indicates that the connector/task has failed
	// (usually by raising an exception, which is reported in the status output).
	FAILED ConnectorState = "FAILED"
)

type ConnectorStatus

type ConnectorStatus struct {
	// Name is the name of the connector.
	Name      string                        `json:"name" header:"Name"`
	Connector ConnectorStatusConnectorField `json:"connector" header:"inline"`
	Tasks     []ConnectorStatusTask         `json:"tasks,omitempty" header:"Tasks,count"`
}

ConnectorStatus describes the data that are being received from the `GetConnectorStatus`.

type ConnectorStatusConnectorField

type ConnectorStatusConnectorField struct {
	State    string `json:"state" header:"State"`      // i.e RUNNING
	WorkerID string `json:"worker_id" header:"Worker"` // i.e fakehost:8083
}

ConnectorStatusConnectorField describes a connector's status, see `ConnectorStatus`.

type ConnectorStatusTask

type ConnectorStatusTask struct {
	ID       int    `json:"id" header:"ID,text"`                  // i.e 1
	State    string `json:"state" header:"State"`                 // i.e FAILED
	WorkerID string `json:"worker_id" header:"Worker"`            // i.e fakehost:8083
	Trace    string `json:"trace,omitempty" header:"Trace,empty"` // i.e org.apache.kafka.common.errors.RecordTooLargeException\n
}

ConnectorStatusTask describes a connector task's status, see `ConnectorStatus`.

type ConnectorTaskReadOnly

type ConnectorTaskReadOnly struct {
	// Connector is the name of the connector the task belongs to.
	Connector string `json:"connector"`
	// Task is the Task ID within the connector.
	Task int `json:"task"`
}

ConnectorTaskReadOnly is the type that returned as "tasks" from the connector, it's for read-only access, it contains the basic information about the connector's task. It usually returned as a slice of ConnectorTaskReadOnly.

See `Connector` type for more.

type Consumer

type Consumer struct {
	Topic                     string `json:"topic"`
	CurrentOffset             int64  `json:"currentOffset"`
	LogEndOffset              int64  `json:"longEndOffset"`
	Lag                       int64  `json:"lag"`
	ConsumerID                string `json:"consumerId"`
	Host                      string `json:"host"`
	ClientID                  string `json:"clientId"`
	MessagesPerSecond         int64  `json:"messagesPerSecond"`
	ProducerMessagesPerSecond int64  `json:"producerMessagesPerSecond"`
}

Consumer describes the consumer valid response data.

type ConsumerCoordinator

type ConsumerCoordinator struct {
	ID   int    `json:"id"`
	Host string `json:"host"`
	Port int    `json:"port"`
	Rack string `json:"rack"`
}

ConsumerCoordinator describes the consumer coordinator's valid response data.

type ConsumerGroupState

type ConsumerGroupState string

ConsumerGroupState describes the valid values of a `ConsumerGroupState`: `StateUnknown`,`StateStable`,`StateRebalancing`,`StateDead`,`StateNoActiveMembers`,`StateExistsNot`,`StateCoordinatorNotFound`.

const (
	// StateUnknown is a valid `ConsumerGroupState` value of "Unknown".
	StateUnknown ConsumerGroupState = "Unknown"
	// StateStable is a valid `ConsumerGroupState` value of "Stable".
	StateStable ConsumerGroupState = "Stable"
	// StateRebalancing is a valid `ConsumerGroupState` value of "Rebalancing".
	StateRebalancing ConsumerGroupState = "Rebalancing"
	// StateDead is a valid `ConsumerGroupState` value of "Dead".
	StateDead ConsumerGroupState = "Dead"
	// StateNoActiveMembers is a valid `ConsumerGroupState` value of "NoActiveMembers".
	StateNoActiveMembers ConsumerGroupState = "NoActiveMembers"
	// StateExistsNot is a valid `ConsumerGroupState` value of "ExistsNot".
	StateExistsNot ConsumerGroupState = "ExistsNot"
	// StateCoordinatorNotFound is a valid `ConsumerGroupState` value of "CoordinatorNotFound".
	StateCoordinatorNotFound ConsumerGroupState = "CoordinatorNotFound"
)

type ConsumersGroup

type ConsumersGroup struct {
	ID          string              `json:"id"`
	Coordinator ConsumerCoordinator `json:"coordinator"`
	// On consumers not active/committing offsets - we don't get any of the following info
	Active               bool               `json:"active"`
	State                ConsumerGroupState `json:"state"`
	Consumers            []string           `json:"consumers"`
	ConsumersCount       int                `json:"consumersCount,omitempty"`
	TopicPartitionsCount int                `json:"topicPartitionsCount,omitempty"`
	MinLag               int64              `json:"minLag,omitempty"`
	MaxLag               int64              `json:"maxLag,omitempty"`
}

ConsumersGroup describes the data that the `Topic`'s `ConsumersGroup` field contains.

type CreateProcessorPayload

type CreateProcessorPayload struct {
	Name        string `json:"name" yaml:"Name"` // required
	SQL         string `json:"sql" yaml:"SQL"`   // required
	Runners     int    `json:"runners" yaml:"Runners"`
	ClusterName string `json:"clusterName" yaml:"ClusterName"`
	Namespace   string `json:"namespace" yaml:"Namespace"`
	Pipeline    string `json:"pipeline" yaml:"Pipeline"` // defaults to Name if not set.
}

CreateProcessorPayload holds the data to be sent from `CreateProcessor`.

type CreateTopicPayload

type CreateTopicPayload struct {
	TopicName   string `json:"topicName" yaml:"Name"`
	Replication int    `json:"replication" yaml:"Replication"`
	Partitions  int    `json:"partitions" yaml:"Partitions"`
	Configs     KV     `json:"configs" yaml:"Configs"`
}

CreateTopicPayload contains the data that the `CreateTopic` accepts, as a single structure.

type CreateUpdateConnectorPayload

type CreateUpdateConnectorPayload struct {
	ClusterName string          `yaml:"ClusterName"`
	Name        string          `yaml:"Name"`
	Config      ConnectorConfig `yaml:"Config"`
}

CreateUpdateConnectorPayload can be used to hold the data for creating or updating a connector.

func (*CreateUpdateConnectorPayload) ApplyAndValidateName

func (c *CreateUpdateConnectorPayload) ApplyAndValidateName() error

ApplyAndValidateName applies some rules to make sure that the connector's data are setup correctly.

type ExecutionMode

type ExecutionMode string

ExecutionMode is the type for the config's execution modes, valid values are: IN_PROC/CONNECT/KUBERNETES.

const (
	// ExecutionModeInvalid represents no mode, this is here for invalid executions mode that
	// maybe returned from the server, maybe useful for the future.
	ExecutionModeInvalid ExecutionMode = "INVALID"
	// ExecutionModeInProcess represents the execution mode IN_PROC.
	ExecutionModeInProcess ExecutionMode = "IN_PROC"
	// ExecutionModeConnect represents the execution mode CONNECT.
	ExecutionModeConnect ExecutionMode = "CONNECT"
	// ExecutionModeKubernetes represents the execution mode KUBERNETES.
	ExecutionModeKubernetes ExecutionMode = "KUBERNETES"
)

func MatchExecutionMode

func MatchExecutionMode(modeStr string) (ExecutionMode, bool)

MatchExecutionMode returns the mode based on the string represetantion of it and a boolean if that mode is exist or not, the mode will always return in uppercase, the input argument is not case sensitive.

The value is just a string but we do this to protect users from mistakes or future releases maybe remove/change or replace a string will be much easier.

type KV

type KV map[string]interface{}

KV shouldn't be the case now that have the `TopicConfig` but the API returns different values for fetching and different for creation of topic or topics configs update.

type KerberosAuthentication

type KerberosAuthentication struct {
	ConfFile string                       `json:"confFile" yaml:"ConfFile" survey:"-"` // keep those, useful for marshal.
	Method   KerberosAuthenticationMethod `json:"-" yaml:"-" survey:"-"`
}

KerberosAuthentication can be used as alternative option of the `BasicAuthentication` for a more secure way to connect to the lenses backend box.

func (KerberosAuthentication) Auth

func (auth KerberosAuthentication) Auth(c *Client) error

Auth implements the `Authentication` for the `KerberosAuthentication`.

func (KerberosAuthentication) FromCCache

func (auth KerberosAuthentication) FromCCache() (KerberosFromCCache, bool)

FromCCache reports whether the kerberos authentication is loaded from a ccache file.

func (KerberosAuthentication) WithKeytab

func (auth KerberosAuthentication) WithKeytab() (KerberosWithKeytab, bool)

WithKeytab reports whether the kerberos authentication is with a keytab file, username (and realm).

func (KerberosAuthentication) WithPassword

func (auth KerberosAuthentication) WithPassword() (KerberosWithPassword, bool)

WithPassword reports whether the kerberos authentication is with username, password (and realm).

type KerberosAuthenticationMethod

type KerberosAuthenticationMethod interface {
	NewClient() (client.Client, error)
}

KerberosAuthenticationMethod is the interface which all available kerberos authentication methods are implement.

See `KerberosWithPassword`, `KerberosWithKeytab` and `KerberosFromCCache` for more.

type KerberosFromCCache

type KerberosFromCCache struct {
	// CCacheFile should be filled with the ccache file path.
	CCacheFile string `json:"ccacheFile" yaml:"CCacheFile" survey:"ccache"`
}

KerberosFromCCache is a `KerberosAuthenticationMethod` using a ccache file path.

The `KerberosAuthentication` calls its `NewClient`.

func (KerberosFromCCache) NewClient

func (m KerberosFromCCache) NewClient() (client.Client, error)

NewClient implements the `KerberosAuthenticationMethod` for the `KerberosFromCCache`.

type KerberosWithKeytab

type KerberosWithKeytab struct {
	Username string `json:"username" yaml:"Username" survey:"username"`

	// Realm is optional, if empty then default is used.
	Realm string `json:"realm" yaml:"Realm" survey:"realm"`
	// KeytabFile the keytab file path.
	KeytabFile string `json:"keytabFile" yaml:"KeytabFile" survey:"keytab"`
}

KerberosWithKeytab is a `KerberosAuthenticationMethod` using a username and a keytab file path and optionally a realm.

The `KerberosAuthentication` calls its `NewClient`.

func (KerberosWithKeytab) NewClient

func (m KerberosWithKeytab) NewClient() (client.Client, error)

NewClient implements the `KerberosAuthenticationMethod` for the `KerberosWithKeytab`.

type KerberosWithPassword

type KerberosWithPassword struct {
	Username string `json:"username" yaml:"Username" survey:"username"`
	Password string `json:"password,omitempty" yaml:"Password" survey:"password"`

	// Realm is optional, if empty then default is used.
	Realm string `json:"realm" yaml:"Realm" survey:"realm"`
}

KerberosWithPassword is a `KerberosAuthenticationMethod` using a username, password and optionally a realm.

The `KerberosAuthentication` calls its `NewClient`.

func (KerberosWithPassword) NewClient

func (m KerberosWithPassword) NewClient() (client.Client, error)

NewClient implements the `KerberosAuthenticationMethod` for the `KerberosWithPassword`.

type LSQLError

type LSQLError struct {
	FromLine   int    `json:"fromLine"`
	ToLine     int    `json:"toLine"`
	FromColumn int    `json:"fromColumn"`
	ToColumn   int    `json:"toColumn"`
	Message    string `json:"error"`
}

LSQLError the form of the error record data that LSQL call returns once.

func (LSQLError) Error

func (err LSQLError) Error() string

type LSQLOffset

type LSQLOffset struct {
	Partition int   `json:"partition" header:"Partition"`
	Min       int64 `json:"min" header:"Min"`
	Max       int64 `json:"max" header:"Max"`
}

LSQLOffset the form of the offset record data that LSQL call returns once.

type LSQLRecord

type LSQLRecord struct {
	Timestamp int64  `json:"timestamp"`
	Partition int    `json:"partition"`
	Key       string `json:"key"`
	Offset    int    `json:"offset"`
	Topic     string `json:"topic"`
	Value     string `json:"value"` // represents a json object, in raw string.
}

LSQLRecord and LSQLStop and LSQLOffset and LSQLError and optional LSQLStats are the structures that various LSQL information are stored by the SSE client-side, see `LSQL` for more.

type LSQLRecordHandler

type LSQLRecordHandler func(LSQLRecord) error

LSQLRecordHandler and LSQLStopHandler and LSQLStopErrorHandler and optionally LSQLStatsHandler describe type of functions that accepts LSQLRecord, LSQLStop, LSQLError and LSQLStats respectfully, and return an error if error not nil then client stops reading from SSE. It's used by the `LSQL` function.

type LSQLRunningQuery

type LSQLRunningQuery struct {
	ID        int64  `json:"id" header:"ID,text"`
	SQL       string `json:"sql" header:"SQL"`
	User      string `json:"user"  header:"User"`
	Timestamp int64  `json:"ts" header:"Timestamp,timestamp(ms|utc|02 Jan 2006 15:04)"`
}

LSQLRunningQuery is the form of the data that the `GetRunningQueries` returns.

type LSQLStats

type LSQLStats struct {
	// Number of records read from Kafka so far.
	TotalRecords int `json:"totalRecords"`
	// Number of records not matching the filter.
	RecordsSkipped int `json:"recordsSkipped"`
	// Max number of records to pull (driven by LIMIT X,
	// if LIMIT is not present it gets the default config in LENSES).
	RecordsLimit int `json:"recordsLimit"`
	// Data read so far in bytes.
	TotalBytes int64 `json:"totalBytes"`
	// Max data allowed in bytes  (driven by `max.bytes`= X,
	// if is not present it gets the default config in LENSES).
	MaxSize int64 `json:"maxSize"`
	// CurrentSize represents the data length accepted so far in bytes (these are records passing the filter).
	CurrentSize int64 `json:"currentSize"`
}

LSQLStats the form of the stats record data that LSQL call returns.

type LSQLStatsHandler

type LSQLStatsHandler func(LSQLStats) error

LSQLStatsHandler describes the form of the function that should be registered to accept stats record data from the LSQL call.

type LSQLStop

type LSQLStop struct {
	// If false `max.time` was reached.
	IsTimeRemaining bool `json:"isTimeRemaining" header:"Time Remaining"`
	// If true there was no more data on the topic and `max.zero.polls` was reached.
	IsTopicEnd bool `json:"isTopicEnd" header:"End"`
	// If true the query has been stopped by admin  (Cancel query equivalence).
	IsStopped bool `json:"isStopped" header:"Stopped"`
	// Number of records read from Kafka.
	TotalRecords int `json:"totalRecords" header:"Total /"`
	// Number of records not matching the filter.
	SkippedRecords int `json:"skippedRecords" header:"Skipped Records"`
	// Max number of records to pull (driven by LIMIT X,
	// if LIMIT is not present it gets the default config in LENSES).
	RecordsLimit int `json:"recordsLimit" header:"Records Limit"`
	// Total size in bytes read from Kafka.
	TotalSizeRead int64 `json:"totalSizeRead" header:"Total Size Read"`
	// Total size in bytes (Kafka size) for the records.
	Size int64 `json:"size" header:"Size"`
	// The topic offsets.
	// If query parameter `&offsets=true` is not present it won't pull the details.
	Offsets []LSQLOffset `json:"offsets" header:"Offsets,count"`
}

LSQLStop the form of the stop record data that LSQL call returns once.

type LSQLStopErrorHandler

type LSQLStopErrorHandler func(LSQLError) error

LSQLStopErrorHandler describes the form of the function that should be registered to accept error record data from the LSQL call once.

type LSQLStopHandler

type LSQLStopHandler func(LSQLStop) error

LSQLStopHandler describes the form of the function that should be registered to accept stop record data from the LSQL call once.

type LSQLValidation

type LSQLValidation struct {
	IsValid bool   `json:"isValid"`
	Line    int    `json:"line"`
	Column  int    `json:"column"`
	Message string `json:"message"`
}

LSQLValidation contains the necessary information about an invalid lenses query, see `ValidateLSQL`. Example Error:

{
    "IsValid": false,
    "Line": 4,
    "Column": 1,
    "Message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "
}

type LicenseInfo

type LicenseInfo struct {
	ClientID    string `json:"clientId" header:"ID,text"`
	IsRespected bool   `json:"isRespected" header:"Respected"`
	MaxBrokers  int    `json:"maxBrokers" header:"Max Brokers"`
	MaxMessages int    `json:"maxMessages,omitempty" header:"/ Messages"`
	Expiry      int64  `json:"expiry" header:"Expires,timestamp(ms|02 Jan 2006 15:04)"`

	// ExpiresAt is the time.Time expiration datetime (unix).
	ExpiresAt time.Time `json:"-"`

	// ExpiresDur is the duration that expires from now.
	ExpiresDur time.Duration `json:"-"`

	// YearsToExpire is the length of years that expires from now.
	YearsToExpire int `json:"yearsToExpire,omitempty"`
	// MonthsToExpire is the length of months that expires from now.
	MonthsToExpire int `json:"monthsToExpire,omitempty"`
	// DaysToExpire is the length of days that expires from now.
	DaysToExpire int `json:"daysToExpire,omitempty"`
}

LicenseInfo describes the data received from the `GetLicenseInfo`.

type LiveConfiguration

type LiveConfiguration struct {
	Host     string `json:"host" yaml:"Host" toml:"Host"`
	User     string `json:"user" yaml:"User" toml:"User"`
	Password string `json:"password" yaml:"Password" toml:"Password"`
	ClientID string `json:"clientId,omitempty" yaml:"ClientID" toml:"ClientID"`
	Debug    bool   `json:"debug" yaml:"Debug" toml:"Debug"`

	// HandshakeTimeout specifies the duration for the handshake to complete.
	HandshakeTimeout time.Duration
	// ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer
	// size is zero, then a useful default size is used. The I/O buffer sizes
	// do not limit the size of the messages that can be sent or received.
	ReadBufferSize, WriteBufferSize int

	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	TLSClientConfig *tls.Config
}

LiveConfiguration contains the contact information about the websocket communication. It contains the host(including the scheme), the user and password credentials and, optionally, the client id which is the kafka consumer group.

See `OpenLiveConnection` for more.

type LiveConnection

type LiveConnection struct {
	// contains filtered or unexported fields
}

LiveConnection is the websocket connection.

func OpenLiveConnection

func OpenLiveConnection(config LiveConfiguration) (*LiveConnection, error)

OpenLiveConnection starts the websocket communication and returns the client connection for further operations. An error will be returned if login failed.

The `Err` function is used to report any reader's error, the reader operates on its own go routine.

The connection starts reading immediately, the implementation is subscribed to the `Success` message to validate the login.

Usage:

c, err := lenses.OpenLiveConnection(lenses.LiveConfiguration{
   [...]
})
c.On(lenses.KafkaMessageResponse, func(pub lenses.LivePublisher, response lenses.LiveResponse) error {
   [...]
})
c.On(lenses.WildcardResponse, func(pub lenses.LivePublisher, response lenses.LiveResponse) error {
   [...catch all messages]
})
c.OnSuccess(func(cub lenses.LivePublisher, response lenses.LiveResponse) error{
   pub.Publish(lenses.SubscribeRequest, 2, `{"sqls": ["SELECT * FROM reddit_posts LIMIT 3"]}`)
}) also OnKafkaMessage, OnError, OnHeartbeat, OnInvalidRequest.

If at least one listener returned an error then the communication is terminated.

func (*LiveConnection) Close

func (c *LiveConnection) Close() error

Close closes the underline websocket connection and stops receiving any new message from the websocket server.

If `Close` called more than once then it will return nil and nothing will happen.

func (*LiveConnection) Err

func (c *LiveConnection) Err() <-chan error

Err can be used to receive the errors coming from the communication, the listeners' errors are sending to that channel too.

func (*LiveConnection) On

func (c *LiveConnection) On(typ ResponseType, cb LiveListener)

On adds a listener, a websocket message subscriber based on the given "typ" `ResponseType`. Use the `WildcardResponse` to subscribe to all message types.

func (*LiveConnection) OnError

func (c *LiveConnection) OnError(cb LiveListener)

OnError adds a listener, a websocket message subscriber based on the "ERROR" `ResponseType`.

func (*LiveConnection) OnHeartbeat

func (c *LiveConnection) OnHeartbeat(cb LiveListener)

OnHeartbeat adds a listener, a websocket message subscriber based on the "HEARTBEAT" `ResponseType`.

func (*LiveConnection) OnInvalidRequest

func (c *LiveConnection) OnInvalidRequest(cb LiveListener)

OnInvalidRequest adds a listener, a websocket message subscriber based on the "INVALIDREQUEST" `ResponseType`.

func (*LiveConnection) OnKafkaMessage

func (c *LiveConnection) OnKafkaMessage(cb LiveListener)

OnKafkaMessage adds a listener, a websocket message subscriber based on the "KAFKAMSG" `ResponseType`.

func (*LiveConnection) OnSuccess

func (c *LiveConnection) OnSuccess(cb LiveListener)

OnSuccess adds a listener, a websocket message subscriber based on the "SUCCESS" `ResponseType`.

func (*LiveConnection) Publish

func (c *LiveConnection) Publish(typ RequestType, correlationID int64, content string) error

Publish sends a `LiveRequest` based on the input arguments as JSON data to the websocket server.

func (*LiveConnection) Wait

func (c *LiveConnection) Wait(interruptSignal <-chan os.Signal) error

Wait waits until interruptSignal fires, if it's nil then it waits for ever.

type LiveListener

type LiveListener func(LivePublisher, LiveResponse) error

LiveListener is the declaration for the subscriber, the subscriber is just a callback which fiers whenever a websocket message with a particular `ResponseType` was sent by the websocket server.

See `On` too.

type LivePublisher

type LivePublisher interface {
	Publish(RequestType, int64, string) error
}

LivePublisher is the interface which the `LiveConnection` implements, it is used on `LiveListeners` to send requests to the websocket server.

type LiveRequest

type LiveRequest struct {
	// Type describes the action the back end will take in response to the request.
	// The available values are: "LOGIN", "SUBSCRIBE", "UNSUBSCRIBE",
	// "PUBLISH" and "COMMIT". The Go type is `RequestType`.
	Type RequestType `json:"type"`

	// CorrelationID is the unique identifier in order for the client to link the response
	// with the request made.
	CorrelationID int64 `json:"correlationId"`

	// Content contains the Json content of the actual request.
	// The content is strictly related to the type described shortly.
	Content string `json:"content"`

	// AuthToken is the unique token identifying the user making the request.
	// This token can only be obtained once the LOGIN request type has completed successfully.
	//
	// It's created automatically by the internal implementation,
	// on the `LivePublisher#Publish` which is used inside the `LiveListeners`.
	AuthToken string `json:"authToken"`
}

LiveRequest contains the necessary information that the back-end websocket server waits to be sent by the websocket client.

type LiveResponse

type LiveResponse struct {
	// Type describes what response content the client has
	// received. Available values are: "ERROR",
	// "INVALIDREQUEST", "KAFKAMSG", "HEARTBEAT" and "SUCCESS". The Go type is `ResponseType`.
	Type ResponseType `json:"type"`

	// CorrelationID is the unique identifier the client has provided in |
	// the request associated with the response.
	CorrelationID int64 `json:"correlationId"`

	// Content contains the actual response content.
	// Each response type has its own content layout.
	Content json.RawMessage `json:"content"`
}

LiveResponse contains the necessary information that the websocket client expects to receive from the back-end websocket server.

type LogLine

type LogLine struct {
	Level      string `json:"level" header:"Level"`
	Thread     string `json:"thread"`
	Logger     string `json:"logger"`
	Message    string `json:"message" header:"Message"`
	Stacktrace string `json:"stacktrace"`
	Timestmap  int64  `json:"timestamp"`
	Time       string `json:"time" header:"Time"`
}

LogLine represents the return value(s) of the `GetLogsInfo` and `GetLogsMetrics` calls.

type PartitionMessage

type PartitionMessage struct {
	Partition int   `json:"partition"`
	Messages  int64 `json:"messages"`
	Begin     int64 `json:"begin"`
	End       int64 `json:"end"`
}

PartitionMessage describes a partition's message response data.

type ProcessorRunnerState

type ProcessorRunnerState struct {
	ID           string `json:"id"`
	Worker       string `json:"worker"`
	State        string `json:"state"`
	ErrorMessage string `json:"errorMsg"`
}

ProcessorRunnerState describes the processor stream, see `ProcessorStream` and `ProcessorResult.

type ProcessorStream

type ProcessorStream struct {
	ID              string `json:"id"` // header:"ID,text"`
	Name            string `json:"name" header:"Name"`
	DeploymentState string `json:"deploymentState" header:"State"`
	Runners         int    `json:"runners" header:"Runners"`
	User            string `json:"user" header:"Created By"`
	StartTimestamp  int64  `json:"startTs" header:"Started at,timestamp(ms|02 Jan 2006 15:04)"`
	StopTimestamp   int64  `json:"stopTs,omitempty"` // header:"Stopped,timestamp(ms|02 Jan 2006 15:04),No"`
	Uptime          int64  `json:"uptime" header:"Up time,unixduration"`

	Namespace   string `json:"namespace" header:"Namespace"`
	ClusterName string `json:"clusterName" header:"Cluster"`

	SQL string `json:"sql"` // header:"SQL"`

	TopicValueDecoder string `json:"topicValueDecoder"` // header:"Topic Decoder"`
	TopicKeyDecoder   string `json:"topicKeyDecoder"`   // header:"Topic Decoder"`
	Pipeline          string `json:"pipeline"`          // header:"Pipeline"`

	ToTopics               []string `json:"toTopics,omitempty"` // header:"To Topics"`
	FromTopics             []string `json:"fromTopics,omitempty"`
	LastActionMessage      string   `json:"lastActionMsg,omitempty"`      // header:"Last Action"`
	DeploymentErrorMessage string   `json:"deploymentErrorMsg,omitempty"` // header:"Depl Error"`

	RunnerState map[string]ProcessorRunnerState `json:"runnerState"`
}

ProcessorStream describes the processor stream, see `ProcessorResult`.

type ProcessorTarget

type ProcessorTarget struct {
	Cluster    string   `json:"cluster"`
	Version    string   `json:"version,omitempty"`
	Namespaces []string `json:"namespaces"`
}

ProcessorTarget describes the processor target, see `ProcessorResult`.

type ProcessorsResult

type ProcessorsResult struct {
	Targets []ProcessorTarget `json:"targets"`
	Streams []ProcessorStream `json:"streams"`
}

ProcessorsResult describes the data that are being received from the `GetProcessors`.

type Quota

type Quota struct {
	// Entityname is the Kafka client id for "CLIENT"
	// and "CLIENTS" and user name for "USER", "USER" and "USERCLIENT", the `QuotaEntityXXX`.
	EntityName string `json:"entityName" yaml:"EntityName" header:"Name"`
	// EntityType can be either `QuotaEntityClient`, `QuotaEntityClients`,
	// `QuotaEntityClientsDefault`, `QuotaEntityUser`, `QuotaEntityUsers`, `QuotaEntityUserClient`
	// or `QuotaEntityUsersDefault`.
	EnityType QuotaEntityType `json:"entityType" yaml:"EntityType" header:"Type"`
	// Child is optional and only present for entityType `QuotaEntityUserClient` and is the client id.
	Child string `json:"child,omitempty" yaml:"Child"` // header:"Child"`
	// Properties  is a map of the quota constraints, the `QuotaConfig`.
	Properties QuotaConfig `json:"properties" yaml:"Properties" header:"inline"`
	// URL is the url from this quota in Lenses.
	URL string `json:"url" yaml:"URL"`

	IsAuthorized bool `json:"isAuthorized" yaml:"IsAuthorized"`
}

Quota is the type which defines a single Quota.

type QuotaConfig

type QuotaConfig struct {
	// header note:
	// if "number" and no default value, then it will add "0", we use the empty space between commas to tell that the default value is space.
	ProducerByteRate  string `json:"producer_byte_rate" yaml:"ProducerByteRate" header:"Produce/sec, ,number"`
	ConsumerByteRate  string `json:"consumer_byte_rate" yaml:"ConsumerByteRate" header:"Consume/sec, ,number"`
	RequestPercentage string `json:"request_percentage" yaml:"RequestPercentage" header:"Request Percentage, ,number"`
}

QuotaConfig is a typed struct which defines the map of the quota constraints, producer_byte_rate, consumer_byte_rate and request_percentage.

type QuotaEntityType

type QuotaEntityType string

QuotaEntityType is a string and it defines the valid entity types for a single Quota.

const (
	// QuotaEntityClient is the "CLIENT" Quota entity type.
	QuotaEntityClient QuotaEntityType = "CLIENT"
	// QuotaEntityClients is the "CLIENTS" Quota entity type.
	QuotaEntityClients QuotaEntityType = "CLIENTS"
	// QuotaEntityClientsDefault is the "CLIENTS DEFAULT" Quota entity type.
	QuotaEntityClientsDefault QuotaEntityType = "CLIENTS DEFAULT"
	// QuotaEntityUser is the "USER" Quota entity type.
	QuotaEntityUser QuotaEntityType = "USER"
	// QuotaEntityUsers is the "USERS" Quota entity type.
	QuotaEntityUsers QuotaEntityType = "USERS"
	// QuotaEntityUserClient is the "USERCLIENT" Quota entity type.
	QuotaEntityUserClient QuotaEntityType = "USERCLIENT"
	// QuotaEntityUsersDefault is the "USERS DEFAULT" Quota entity type.
	QuotaEntityUsersDefault QuotaEntityType = "USERS DEFAULT"
)

type RequestOption

type RequestOption func(r *http.Request) error

RequestOption is just a func which receives the current HTTP request and alters it, if the return value of the error is not nil then `Client#Do` fails with that error.

type RequestType

type RequestType string

RequestType is the corresponding action/message type for the request sent to the back-end server.

const (
	// SubscribeRequest is the "SUBSCRIBE" action type sent to the back-end server.
	SubscribeRequest RequestType = "SUBSCRIBE"
	// UnsubscribeRequest is the "UNSUBSCRIBE" action type sent to the back-end server.
	UnsubscribeRequest RequestType = "UNSUBSCRIBE"
	// PublishRequest is the "PUBLISH" action type sent to the back-end server.
	PublishRequest RequestType = "PUBLISH"
	// CommitRequest is the "COMMIT" action type sent to the back-end server.
	CommitRequest RequestType = "COMMIT"
	// LoginRequest is the "LOGIN" action type sent to the back-end server.
	LoginRequest RequestType = "LOGIN"
)

type ResourceError

type ResourceError struct {
	StatusCode int    `json:"statusCode" header:"Status Code"`
	Method     string `json:"method" header:"Method"`
	URI        string `json:"uri" header:"Target"`
	Body       string `json:"message" header:"Message"`
}

ResourceError is being fired from all API calls when an error code is received.

func NewResourceError

func NewResourceError(statusCode int, uri, method, body string) ResourceError

NewResourceError is just a helper to create a new `ResourceError` to return from custom calls, it's "cli-compatible".

func (ResourceError) Code

func (err ResourceError) Code() int

Code returns the status code.

func (ResourceError) Error

func (err ResourceError) Error() string

Error returns the error's message body. The result's first letter is lowercase when the above rule is applied and it never ends with examination points '.' or '!'.

func (ResourceError) String

func (err ResourceError) String() string

String returns the detailed cause of the error.

type ResponseType

type ResponseType string

ResponseType is the corresponding message type for the response came from the back-end server to the client.

const (
	// WildcardResponse is a custom type only for the go library
	// which can be passed to the `On` event in order to catch all the incoming messages and fire the corresponding callback response handler.
	WildcardResponse ResponseType = "*"
	// ErrorResponse is the "ERROR" receive message type.
	ErrorResponse ResponseType = "ERROR"
	// InvalidRequestResponse is the "INVALIDREQUEST" receive message type.
	InvalidRequestResponse ResponseType = "INVALIDREQUEST"
	// KafkaMessageResponse is the "KAFKAMSG" receive message type.
	KafkaMessageResponse ResponseType = "KAFKAMSG"
	// HeartbeatResponse is the "HEARTBEAT" receive message type.
	HeartbeatResponse ResponseType = "HEARTBEAT"
	// SuccessResponse is the "SUCCESS" receive message type.
	SuccessResponse ResponseType = "SUCCESS"
)

type Schema

type Schema struct {
	ID int `json:"id,omitempty" yaml:"ID,omitempty" header:"ID,text"`
	// Name is the name of the schema is registered under.
	Name string `json:"subject,omitempty" yaml:"Name" header:"Name"` // Name is the "subject" argument in client-code, this structure is being used on CLI for yaml-file based loading.
	// Version of the returned schema.
	Version int `json:"version" header:"Version"`
	// AvroSchema is the Avro schema string.
	AvroSchema string `json:"schema" yaml:"AvroSchema"`
}

Schema describes a schema, look `GetSchema` for more.

type Topic

type Topic struct {
	TopicName            string             `json:"topicName" header:"Name"`
	KeyType              string             `json:"keyType" header:"Key /,NULL"`        // maybe string-based enum?
	ValueType            string             `json:"valueType" header:"Value Type,NULL"` // maybe string-based enum?
	Partitions           int                `json:"partitions" header:"Part"`
	Replication          int                `json:"replication" header:"Repl"`
	IsControlTopic       bool               `json:"isControlTopic"`
	KeySchema            string             `json:"keySchema,omitempty"`
	ValueSchema          string             `json:"valueSchema,omitempty"`
	MessagesPerSecond    int64              `json:"messagesPerSecond" header:"msg/sec"`
	TotalMessages        int64              `json:"totalMessages" header:"Total Msg"`
	Timestamp            int64              `json:"timestamp"`
	Config               []KV               `json:"config" header:"Configs,count"`
	ConsumersGroup       []ConsumersGroup   `json:"consumers"`
	MessagesPerPartition []PartitionMessage `json:"messagesPerPartition"`
	IsMarkedForDeletion  bool               `json:"isMarkedForDeletion" header:"Marked Del"`
}

Topic describes the data that the `GetTopic` returns.

type TopicCleanupPolicy

type TopicCleanupPolicy string

TopicCleanupPolicy is the go type to safety describe the topic config's `CleanupPolicy` field. The available policies are: `TopicDeletePolicy` and `TopicCompactPolicy`.

const (
	TopicDeletePolicy  TopicCleanupPolicy = "delete"
	TopicCompactPolicy TopicCleanupPolicy = "compact"
)

The available cleanup policies for topics.

type TopicConfig

type TopicConfig struct {
	// KV contains all the available topic configs keys as they sent by the backend,
	// even if not declared into the struct,
	// useful for debugging mostly, if somehow the available topic configs keys changed but structured data are not.
	// Another use case is use them to accoblish custom formats, not really necessary for end-users.
	// It's used by the CLI to make sure that no invalid config key is passed into flags as well.
	KV KV

	// The maximum difference allowed between the timestamp when a broker receives a message
	// and the timestamp specified in the message.
	// If MessageTimestampType=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold.
	// This configuration is ignored if MessageTimestampType=LogAppendTime (see below).
	//
	// Defaults to 9223372036854775807.
	MessageTimestampDifferenceMaxMs int64 `json:"message.timestamp.difference.max.ms"`
	// This is largest message size Kafka will allow to be appended.
	// Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
	//
	// Defaults to 1000012.
	MaxMessageBytes int64 `json:"max.message.bytes"`
	// This configuration controls the size of the index that maps offsets to file positions.
	// We preallocate this index file and shrink it only after log rolls. You generally should not need to change this setting.
	//
	// Defaults to 10485760.
	SegmentIndexBytes int64 `json:"segment.index.bytes"`
	// The maximum random jitter subtracted from the scheduled segment roll time to avoid thundering herds of segment rolling.
	//
	// Defaults to 10485760.
	SegmentJitterMs int64 `json:"segment.jitter.ms"`
	// This configuration controls how frequently the log compactor will attempt to clean the log (assuming log compaction is enabled).
	// By default we will avoid cleaning a log where more than 50% of the log has been compacted.
	// This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates).
	// A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log.
	//
	// Defaults to 0.5.
	MinCleanableDirtyRatio float32 `json:"min.cleanable.dirty.ratio"`
	// This configuration controls the maximum size a log can grow to before we will discard old log segments to free up space
	// if we are using the "delete" retention policy. By default there is no size limit only a time limit.
	//
	// Defaults to -1.
	RetentionBytes int64 `json:"retention.bytes"`
	// A list of replicas for which log replication should be throttled on the follower side.
	// The list should describe a set of replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...
	// or alternatively the wildcard '*' can be used to throttle all replicas for this topic.
	//
	// Empty by default, use "*" for wildcard.
	FollowerReplicationThrottledReplicas string `json:"follower.replication.throttled.replicas"`
	// The time to wait before deleting a file from the filesystem.
	//
	// Defaults to 60000.
	FileDeleteDelayMs int64 `json:"file.delete.delay.ms"`
	// Specify the final compression type for a given topic.
	// This configuration accepts the standard compression codecs (`Gzip`, `Snappy`, `LZ4`).
	// It additionally accepts 'uncompressed' which is equivalent to no compression;
	// and `Producer` which means retain the original compression codec set by the producer.
	//
	// Defaults to `Producer` ("producer").
	CompressionType CompressionType `json:"compression.type"`
	// The minimum time a message will remain uncompacted in the log.
	// Only applicable for logs that are being compacted.
	//
	// Defaults to 0.
	MinCompactionLagMs int64 `json:"min.compaction.lag.ms"`
	// This setting allows specifying a time interval at which we will force an fsync of data written to the log.
	//  For example if this was set to 1000 we would fsync after 1000 ms had passed.
	// In general we recommend you not set this and use replication for durability and
	// allow the operating system's background flush capabilities as it is more efficient.
	//
	// Defaults to 9223372036854775807.
	FlushMs int64 `json:"flush.ms"`
	// A string that is either  `TopicDeletePolicy` or `TopicCompactPolicy`.
	// This string designates the retention policy to use on old log segments.
	// The default policy `TopicDeletePolicy` ("delete") will discard old segments when their retention time or size limit has been reached.
	// The `TopicCompactPolicy` ("compact") setting will enable log compaction on the topic.
	//
	// Defaults to `TopicDeletePolicy` ("delete").
	CleanupPolicy TopicCleanupPolicy `json:"cleanup.policy"`
	// Define whether the timestamp in the message is message create time or log append time.
	// The value should be either `TopicMessageCreateTime` ("CreateTime") or `TopicMessageLogAppendTime` ("LogAppendTime").
	//
	// Defaults to `TopicMessageCreateTime`.
	MessageTimestampType TopicMessageTimestampType `json:"message.timestamp.type"`
	// Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort,
	// even though doing so may result in data loss.
	//
	// Defaults to true.
	UncleanLeaderElectionEnable bool `json:"unclean.leader.election.enable"`
	// This setting allows specifying an interval at which we will force an fsync of data written to the log.
	// For example if this was set to 1 we would fsync after every message;
	// if it were 5 we would fsync after every five messages.
	// In general we recommend you not set this and use replication for durability and
	// allow the operating system's background flush capabilities as it is more efficient.
	//
	// Note: this setting can be overridden on a per-topic basis.
	//
	// Defaults to 9223372036854775807.
	FlushMessages int64 `json:"flush.messages"`
	// This configuration controls the maximum time we will retain a log before we will discard old log segments
	// to free up space if you are using the "delete" retention policy.
	// This represents an SLA on how soon consumers must read their data.
	//
	// Defaults to 604800000.
	RetentionMs int64 `json:"retention.ms"`
	// When a producer sets acks to \"all\" (or \"-1\"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of \"all\". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
	//
	// Defaults to 1.
	MinInsyncReplicas int `json:"min.insync.replicas"`
	// Specify the message format version the broker will use to append messages to the logs.
	// The value should be a valid ApiVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details.
	// By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version.
	// Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don't understand.
	//
	// Defaults to the relative to broker version.
	MessageFormatVersion string `json:"message.format.version"`
	// A list of replicas for which log replication should be throttled on the leader side.
	// The list should describe a set of replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...
	// or alternatively the wildcard '*' can be used to throttle all replicas for this topic.
	//
	// Empty by default.
	LeaderReplicationThrottledReplicas string `json:"leader.replication.throttled.replicas"`
	// The amount of time to retain delete tombstone markers for log compacted topics.
	// This setting also gives a bound on the time in which a consumer must
	// complete a read if they begin from offset 0 to ensure that they get a valid snapshot of
	// the final stage (otherwise delete tombstones may be collected before they complete their scan).
	//
	// Defaults to 86400000 (24 hours).
	DeleteRetentionMs int64 `json:"delete.retention.ms"`
	// Indicates if should pre allocate file when create new segment.
	//
	// Defaults to false.
	Preallocate bool `json:"preallocate"`
	// This setting controls how frequently Kafka adds an index entry to it's offset index.
	// The default setting ensures that we index a message roughly every 4096 bytes.
	// More indexing allows reads to jump closer to the exact position in the log but makes the index larger.
	// You probably don't need to change this.
	//
	// Defaults to 4096.
	IndexIntervalBytes int64 `json:"index.interval.bytes"`
	// This configuration controls the segment file size for the log.
	// Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
	//
	// Defaults to 1073741824.
	SegmentBytes int64 `json:"segment.bytes"`
	// This configuration controls the period of time after which Kafka will force the log to roll
	// even if the segment file isn't full to ensure that retention can delete or compact old data.
	//
	// Defaults to 604800000.
	SegmentMs int64 `json:"segment.ms"`
}

TopicConfig describes the topic's `Config` field.

type TopicMessageTimestampType

type TopicMessageTimestampType string

TopicMessageTimestampType is the type to safety describe the topic's config's `MessageTimestampType` field. The available message timestamp types are: `TopicMessageCreateTime` and `TopicMessageLogAppendTime`.

const (
	TopicMessageCreateTime    TopicMessageTimestampType = "CreateTime"
	TopicMessageLogAppendTime TopicMessageTimestampType = "LogAppendTime"
)

The available timestamp types for topic's messages.

type TopicMetadata

type TopicMetadata struct {
	TopicName string `json:"topicName" yaml:"TopicName" header:"Topic"`
	KeyType   string `json:"keyType,omitempty" yaml:"KeyType" header:"Key /,NULL"`
	ValueType string `json:"valueType,omitempty" yaml:"ValueType" header:"Value Type,NULL"`

	ValueSchemaRaw string `json:"valueSchema,omitempty" yaml:"ValueSchema,omitempty"` // for response read.
	KeySchemaRaw   string `json:"keySchema,omitempty" yaml:"KeySchema,omitempty"`     // for response read.
}

TopicMetadata describes the data received from the `GetTopicsMetadata` and the payload to send on the `CreateTopicMetadata`.

type UnmarshalFunc

type UnmarshalFunc func(in []byte, outPtr *Config) error

UnmarshalFunc is the most standard way to declare a Decoder/Unmarshaler to read the configurations and more. See `ReadConfig` and `ReadConfigFromFile` for more.

type UpdateTopicPayload

type UpdateTopicPayload struct {
	Name    string `json:"name,omitempty" yaml:"Name"` // empty for request send, filled for cli.
	Configs []KV   `json:"configs,omitempty" yaml:"Configs"`
}

UpdateTopicPayload contains the data that the `CreateTopic` accepts, as a single structure.

type User

type User struct {
	Token                string   `json:"token"`
	Name                 string   `json:"user" header:"Name"`
	SchemaRegistryDelete bool     `json:"schemaRegistryDelete" header:"Schema Registry Delete"`
	Roles                []string `json:"roles" header:"Roles"`
}

User represents the user of the client.

type UserProfile

type UserProfile struct {
	Topics       []string `json:"topics" header:"Topics"`
	Schemas      []string `json:"schemas" header:"Schemas"`
	Transformers []string `json:"transformers" header:"Transformers"`
}

UserProfile contains all the user-specific favourites, only kafka related info.

Directories

Path Synopsis
cmd
lenses-cli
Package main provides the command line based tool for the Landoop's Lenses client REST API.
Package main provides the command line based tool for the Landoop's Lenses client REST API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL