pulsar

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2019 License: Apache-2.0 Imports: 24 Imported by: 6

Documentation

Index

Constants

View Source
const (
	DefaultWebServiceURL       = "http://localhost:8080"
	DefaultHTTPTimeOutDuration = 5 * time.Minute
)
View Source
const (
	JavaRuntime   = "JAVA"
	PythonRuntime = "PYTHON"
	GoRuntime     = "GO"
)
View Source
const (
	FirstBoundary string = "0x00000000"
	LastBoundary  string = "0xffffffff"
)
View Source
const (
	PublishTimeHeader = "X-Pulsar-Publish-Time"
	BatchHeader       = "X-Pulsar-Num-Batch-Message"
	PropertyPrefix    = "X-Pulsar-PROPERTY-"
)
View Source
const (
	PUBLICTENANT           = "public"
	DEFAULTNAMESPACE       = "default"
	PARTITIONEDTOPICSUFFIX = "-partition-"
)
View Source
const DefaultAPIVersion = "v2"
View Source
const PATTEN = "^[-=:.\\w]*$"

allowed characters for property, namespace, cluster and topic names are alphanumeric (a-zA-Z0-9) and these special chars -=:. and % is allowed as part of valid URL encoding

View Source
const WindowConfigKey = "__WINDOWCONFIGS__"

Variables

View Source
var EXAMPLES = "EXAMPLES:"
View Source
var Earliest = MessageID{-1, -1, -1, -1}
View Source
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
View Source
var OUTPUT = "OUTPUT:"
View Source
var PERMISSION = "REQUIRED PERMISSION:"
View Source
var ReleaseVersion = "None"
View Source
var SCOPE = "SCOPE:"
View Source
var SPACES = "    "
View Source
var USEDFOR = "USED FOR:"

Functions

func IsAdminError

func IsAdminError(err error) bool

Types

type APIVersion

type APIVersion int
const (
	V1 APIVersion = iota
	V2
	V3
)

func (APIVersion) String

func (v APIVersion) String() string

type AllocatorStats

type AllocatorStats struct {
	NumDirectArenas      int              `json:"numDirectArenas"`
	NumHeapArenas        int              `json:"numHeapArenas"`
	NumThreadLocalCaches int              `json:"numThreadLocalCaches"`
	NormalCacheSize      int              `json:"normalCacheSize"`
	SmallCacheSize       int              `json:"smallCacheSize"`
	TinyCacheSize        int              `json:"tinyCacheSize"`
	DirectArenas         []PoolArenaStats `json:"directArenas"`
	HeapArenas           []PoolArenaStats `json:"heapArenas"`
}

type AuthAction

type AuthAction string

func ParseAuthAction

func ParseAuthAction(action string) (AuthAction, error)

func (AuthAction) String

func (a AuthAction) String() string

type AuthPolicies

type AuthPolicies struct {
	NamespaceAuth         map[string]AuthAction            `json:"namespace_auth"`
	DestinationAuth       map[string]map[string]AuthAction `json:"destination_auth"`
	SubscriptionAuthRoles map[string][]string              `json:"subscription_auth_roles"`
}

func NewAuthPolicies

func NewAuthPolicies() *AuthPolicies

type AutoFailoverPolicyData

type AutoFailoverPolicyData struct {
	PolicyType AutoFailoverPolicyType `json:"policy_type"`
	Parameters map[string]string      `json:"parameters"`
}

type AutoFailoverPolicyType

type AutoFailoverPolicyType string
const (
	MinAvailable AutoFailoverPolicyType = "min_available"
)

type BacklogQuota

type BacklogQuota struct {
	Limit  int64           `json:"limit"`
	Policy RetentionPolicy `json:"policy"`
}

func NewBacklogQuota

func NewBacklogQuota(limit int64, policy RetentionPolicy) BacklogQuota

type BacklogQuotaType

type BacklogQuotaType string
const DestinationStorage BacklogQuotaType = "destination_storage"

type BookieAffinityGroupData

type BookieAffinityGroupData struct {
	BookkeeperAffinityGroupPrimary   string `json:"bookkeeperAffinityGroupPrimary"`
	BookkeeperAffinityGroupSecondary string `json:"bookkeeperAffinityGroupSecondary"`
}

type BrokerAssignment

type BrokerAssignment string
const (
	Primary   BrokerAssignment = "primary"
	Secondary BrokerAssignment = "secondary"
	Shared    BrokerAssignment = "shared"
)

type BrokerData

type BrokerData struct {
	URL         string `json:"brokerUrl"`
	ConfigName  string `json:"configName"`
	ConfigValue string `json:"configValue"`
}

type BrokerNamespaceIsolationData

type BrokerNamespaceIsolationData struct {
	BrokerName     string   `json:"brokerName"`
	PolicyName     string   `json:"policyName"`
	IsPrimary      bool     `json:"isPrimary"`
	NamespaceRegex []string `json:"namespaceRegex"`
}

type BrokerStats

type BrokerStats interface {
	// GetMetrics returns Monitoring metrics
	GetMetrics() ([]Metrics, error)

	// GetMBeans requests JSON string server mbean dump
	GetMBeans() ([]Metrics, error)

	// GetTopics returns JSON string topics stats
	GetTopics() (string, error)

	// GetLoadReport returns load report of broker
	GetLoadReport() (*LocalBrokerData, error)

	// GetAllocatorStats returns stats from broker
	GetAllocatorStats(allocatorName string) (*AllocatorStats, error)
}

BrokerStats is admin interface for broker stats management

type BrokerStatsData

type BrokerStatsData struct {
	Indent bool `json:"indent"`
}

type Brokers

type Brokers interface {
	// GetActiveBrokers returns the list of active brokers in the cluster.
	GetActiveBrokers(cluster string) ([]string, error)

	// GetDynamicConfigurationNames returns list of updatable configuration name
	GetDynamicConfigurationNames() ([]string, error)

	// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
	GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error)

	// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
	// brokers and all brokers can update {@link ServiceConfiguration} value locally
	UpdateDynamicConfiguration(configName, configValue string) error

	// DeleteDynamicConfiguration deletes dynamic configuration value in to Zk. It will not impact current value
	// in broker but next time when broker restarts, it applies value from configuration file only.
	DeleteDynamicConfiguration(configName string) error

	// GetRuntimeConfigurations returns values of runtime configuration
	GetRuntimeConfigurations() (map[string]string, error)

	// GetInternalConfigurationData returns the internal configuration data
	GetInternalConfigurationData() (*InternalConfigurationData, error)

	// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
	GetAllDynamicConfigurations() (map[string]string, error)

	// HealthCheck run a health check on the broker
	HealthCheck() error
}

Brokers is admin interface for brokers management

type BundlesData

type BundlesData struct {
	Boundaries []string `json:"boundaries"`
	NumBundles int      `json:"numBundles"`
}

func NewBundlesData

func NewBundlesData(boundaries []string) BundlesData

func NewBundlesDataWithNumBundles

func NewBundlesDataWithNumBundles(numBundles int) *BundlesData

func NewDefaultBoundle

func NewDefaultBoundle() *BundlesData

type Client

type Client interface {
	Clusters() Clusters
	Functions() Functions
	Tenants() Tenants
	Topics() Topics
	Subscriptions() Subscriptions
	Sources() Sources
	Sinks() Sinks
	Namespaces() Namespaces
	Schemas() Schema
	NsIsolationPolicy() NsIsolationPolicy
	Brokers() Brokers
	BrokerStats() BrokerStats
	ResourceQuotas() ResourceQuotas
	FunctionsWorker() FunctionsWorker
}

Client provides a client to the Pulsar Restful API

func New

func New(config *Config) (Client, error)

New returns a new client

type ClusterData

type ClusterData struct {
	Name                string   `json:"-"`
	ServiceURL          string   `json:"serviceUrl"`
	ServiceURLTls       string   `json:"serviceUrlTls"`
	BrokerServiceURL    string   `json:"brokerServiceUrl"`
	BrokerServiceURLTls string   `json:"brokerServiceUrlTls"`
	PeerClusterNames    []string `json:"peerClusterNames"`
}

ClusterData information on a cluster

type Clusters

type Clusters interface {
	// List returns the list of clusters
	List() ([]string, error)

	// Get the configuration data for the specified cluster
	Get(string) (ClusterData, error)

	// Create a new cluster
	Create(ClusterData) error

	// Delete an existing cluster
	Delete(string) error

	// Update the configuration for a cluster
	Update(ClusterData) error

	// UpdatePeerClusters updates peer cluster names.
	UpdatePeerClusters(string, []string) error

	// GetPeerClusters returns peer-cluster names
	GetPeerClusters(string) ([]string, error)

	// CreateFailureDomain creates a domain into cluster
	CreateFailureDomain(FailureDomainData) error

	// GetFailureDomain returns the domain registered into a cluster
	GetFailureDomain(clusterName, domainName string) (FailureDomainData, error)

	// ListFailureDomains returns all registered domains in cluster
	ListFailureDomains(string) (FailureDomainMap, error)

	// DeleteFailureDomain deletes a domain in cluster
	DeleteFailureDomain(FailureDomainData) error

	// UpdateFailureDomain updates a domain into cluster
	UpdateFailureDomain(FailureDomainData) error
}

Clusters is admin interface for clusters management

type Config

type Config struct {
	WebServiceURL string
	HTTPTimeout   time.Duration
	HTTPClient    *http.Client
	APIVersion    APIVersion

	Auth       *auth.TLSAuthProvider
	AuthParams string
	TLSOptions *TLSOptions
	TokenAuth  *auth.TokenAuthProvider
}

Config is used to configure the admin client

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a default configuration for the pulsar admin client

type ConnectorDefinition

type ConnectorDefinition struct {
	// The name of the connector type
	Name string `json:"name"`

	// Description to be used for user help
	Description string `json:"description"`

	// The class name for the connector source implementation
	// <p>If not defined, it will be assumed this connector cannot act as a data source
	SourceClass string `json:"sourceClass"`

	// The class name for the connector sink implementation
	// <p>If not defined, it will be assumed this connector cannot act as a data sink
	SinkClass string `json:"sinkClass"`
}

Basic information about a Pulsar connector

type ConsumerConfig

type ConsumerConfig struct {
	SchemaType        string
	SerdeClassName    string
	IsRegexPattern    bool
	ReceiverQueueSize int
}

type ConsumerStats

type ConsumerStats struct {
	BlockedConsumerOnUnAckedMsgs bool              `json:"blockedConsumerOnUnackedMsgs"`
	AvailablePermits             int               `json:"availablePermits"`
	UnAckedMessages              int               `json:"unackedMessages"`
	MsgRateOut                   float64           `json:"msgRateOut"`
	MsgThroughputOut             float64           `json:"msgThroughputOut"`
	MsgRateRedeliver             float64           `json:"msgRateRedeliver"`
	ConsumerName                 string            `json:"consumerName"`
	Metadata                     map[string]string `json:"metadata"`
}

type CursorInfo

type CursorInfo struct {
	Version                   int                `json:"version"`
	CreationDate              string             `json:"creationDate"`
	ModificationDate          string             `json:"modificationDate"`
	CursorsLedgerID           int64              `json:"cursorsLedgerId"`
	MarkDelete                PositionInfo       `json:"markDelete"`
	IndividualDeletedMessages []MessageRangeInfo `json:"individualDeletedMessages"`
	Properties                map[string]int64
}

type CursorStats

type CursorStats struct {
	MarkDeletePosition                       string           `json:"markDeletePosition"`
	ReadPosition                             string           `json:"readPosition"`
	WaitingReadOp                            bool             `json:"waitingReadOp"`
	PendingReadOps                           int              `json:"pendingReadOps"`
	MessagesConsumedCounter                  int64            `json:"messagesConsumedCounter"`
	CursorLedger                             int64            `json:"cursorLedger"`
	CursorLedgerLastEntry                    int64            `json:"cursorLedgerLastEntry"`
	IndividuallyDeletedMessages              string           `json:"individuallyDeletedMessages"`
	LastLedgerWitchTimestamp                 string           `json:"lastLedgerWitchTimestamp"`
	State                                    string           `json:"state"`
	NumberOfEntriesSinceFirstNotAckedMessage int64            `json:"numberOfEntriesSinceFirstNotAckedMessage"`
	TotalNonContiguousDeletedMessagesRange   int              `json:"totalNonContiguousDeletedMessagesRange"`
	Properties                               map[string]int64 `json:"properties"`
}

type DispatchRate

type DispatchRate struct {
	DispatchThrottlingRateInMsg  int   `json:"dispatchThrottlingRateInMsg"`
	DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"`
	RatePeriodInSecond           int   `json:"ratePeriodInSecond"`
}

func NewDispatchRate

func NewDispatchRate() *DispatchRate

type Error

type Error struct {
	Reason string `json:"reason"`
	Code   int
}

func (Error) Error

func (e Error) Error() string

type Example

type Example struct {
	Desc    string
	Command string
}

type ExceptionInformation

type ExceptionInformation struct {
	ExceptionString string `json:"exceptionString"`
	TimestampMs     int64  `json:"timestampMs"`
}

type FailureDomainData

type FailureDomainData struct {
	ClusterName string   `json:"-"`
	DomainName  string   `json:"-"`
	BrokerList  []string `json:"brokers"`
}

Failure Domain information

type FailureDomainMap

type FailureDomainMap map[string]FailureDomainData

type FunctionConfig

type FunctionConfig struct {
	TimeoutMs     *int64  `json:"timeoutMs" yaml:"timeoutMs"`
	TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"`
	// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
	CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
	RetainOrdering      bool `json:"retainOrdering" yaml:"retainOrdering"`
	AutoAck             bool `json:"autoAck" yaml:"autoAck"`
	Parallelism         int  `json:"parallelism" yaml:"parallelism"`
	MaxMessageRetries   int  `json:"maxMessageRetries" yaml:"maxMessageRetries"`

	Output string `json:"output" yaml:"output"`

	OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"`
	LogTopic             string `json:"logTopic" yaml:"logTopic"`
	ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"`

	// Represents either a builtin schema type (eg: 'avro', 'json', etc) or the class name for a Schema implementation
	OutputSchemaType string `json:"outputSchemaType" yaml:"outputSchemaType"`

	Runtime         string `json:"runtime" yaml:"runtime"`
	DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
	SubName         string `json:"subName" yaml:"subName"`
	FQFN            string `json:"fqfn" yaml:"fqfn"`
	Jar             string `json:"jar" yaml:"jar"`
	Py              string `json:"py" yaml:"py"`
	Go              string `json:"go" yaml:"go"`
	// Any flags that you want to pass to the runtime.
	// note that in thread mode, these flags will have no impact
	RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`

	Tenant    string `json:"tenant" yaml:"tenant"`
	Namespace string `json:"namespace" yaml:"namespace"`
	Name      string `json:"name" yaml:"name"`
	ClassName string `json:"className" yaml:"className"`

	Resources          *Resources             `json:"resources" yaml:"resources"`
	WindowConfig       *WindowConfig          `json:"windowConfig" yaml:"windowConfig"`
	Inputs             []string               `json:"inputs" yaml:"inputs"`
	UserConfig         map[string]interface{} `json:"userConfig" yaml:"userConfig"`
	CustomSerdeInputs  map[string]string      `json:"customSerdeInputs" yaml:"customSerdeInputs"`
	CustomSchemaInputs map[string]string      `json:"customSchemaInputs" yaml:"customSchemaInputs"`

	// A generalized way of specifying inputs
	InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"`

	// This is a map of secretName(aka how the secret is going to be
	// accessed in the function via context) to an object that
	// encapsulates how the secret is fetched by the underlying
	// secrets provider. The type of an value here can be found by the
	// SecretProviderConfigurator.getSecretObjectType() method.
	Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`
}

type FunctionData

type FunctionData struct {
	UpdateAuthData       bool `json:"updateAuthData"`
	RetainOrdering       bool `json:"retainOrdering"`
	Watch                bool `json:"watch"`
	AutoAck              bool `json:"autoAck"`
	Parallelism          int  `json:"parallelism"`
	WindowLengthCount    int  `json:"windowLengthCount"`
	SlidingIntervalCount int  `json:"slidingIntervalCount"`
	MaxMessageRetries    int  `json:"maxMessageRetries"`

	TimeoutMs                 int64   `json:"timeoutMs"`
	SlidingIntervalDurationMs int64   `json:"slidingIntervalDurationMs"`
	WindowLengthDurationMs    int64   `json:"windowLengthDurationMs"`
	RAM                       int64   `json:"ram"`
	Disk                      int64   `json:"disk"`
	CPU                       float64 `json:"cpu"`
	SubsName                  string  `json:"subsName"`
	DeadLetterTopic           string  `json:"deadLetterTopic"`
	Key                       string  `json:"key"`
	State                     string  `json:"state"`
	TriggerValue              string  `json:"triggerValue"`
	TriggerFile               string  `json:"triggerFile"`
	Topic                     string  `json:"topic"`

	UserCodeFile         string          `json:"-"`
	FQFN                 string          `json:"fqfn"`
	Tenant               string          `json:"tenant"`
	Namespace            string          `json:"namespace"`
	FuncName             string          `json:"functionName"`
	InstanceID           string          `json:"instance_id"`
	ClassName            string          `json:"className"`
	Jar                  string          `json:"jarFile"`
	Py                   string          `json:"pyFile"`
	Go                   string          `json:"goFile"`
	Inputs               string          `json:"inputs"`
	TopicsPattern        string          `json:"topicsPattern"`
	Output               string          `json:"output"`
	LogTopic             string          `json:"logTopic"`
	SchemaType           string          `json:"schemaType"`
	CustomSerDeInputs    string          `json:"customSerdeInputString"`
	CustomSchemaInput    string          `json:"customSchemaInputString"`
	OutputSerDeClassName string          `json:"outputSerdeClassName"`
	FunctionConfigFile   string          `json:"fnConfigFile"`
	ProcessingGuarantees string          `json:"processingGuarantees"`
	UserConfig           string          `json:"userConfigString"`
	FuncConf             *FunctionConfig `json:"-"`
}

FunctionData information for a Pulsar Function

type FunctionInstanceStats

type FunctionInstanceStats struct {
	FunctionInstanceStatsDataBase

	InstanceID int64 `json:"instanceId"`

	Metrics FunctionInstanceStatsData `json:"metrics"`
}

type FunctionInstanceStatsData

type FunctionInstanceStatsData struct {
	OneMin FunctionInstanceStatsDataBase `json:"oneMin"`

	// Timestamp of when the function was last invoked for instance
	LastInvocation int64 `json:"lastInvocation"`

	// Map of user defined metrics
	UserMetrics map[string]float64 `json:"userMetrics"`

	FunctionInstanceStatsDataBase
}

type FunctionInstanceStatsDataBase

type FunctionInstanceStatsDataBase struct {
	// Total number of records function received from source for instance
	ReceivedTotal int64 `json:"receivedTotal"`

	// Total number of records successfully processed by user function for instance
	ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"`

	// Total number of system exceptions thrown for instance
	SystemExceptionsTotal int64 `json:"systemExceptionsTotal"`

	// Total number of user exceptions thrown for instance
	UserExceptionsTotal int64 `json:"userExceptionsTotal"`

	// Average process latency for function for instance
	AvgProcessLatency float64 `json:"avgProcessLatency"`
}

type FunctionInstanceStatus

type FunctionInstanceStatus struct {
	InstanceID int                        `json:"instanceId"`
	Status     FunctionInstanceStatusData `json:"status"`
}

type FunctionInstanceStatusData

type FunctionInstanceStatusData struct {
	Running                  bool                   `json:"running"`
	Err                      string                 `json:"error"`
	NumRestarts              int64                  `json:"numRestarts"`
	NumReceived              int64                  `json:"numReceived"`
	NumSuccessfullyProcessed int64                  `json:"numSuccessfullyProcessed"`
	NumUserExceptions        int64                  `json:"numUserExceptions"`
	LatestUserExceptions     []ExceptionInformation `json:"latestUserExceptions"`
	NumSystemExceptions      int64                  `json:"numSystemExceptions"`
	LatestSystemExceptions   []ExceptionInformation `json:"latestSystemExceptions"`
	AverageLatency           float64                `json:"averageLatency"`
	LastInvocationTime       int64                  `json:"lastInvocationTime"`
	WorkerID                 string                 `json:"workerId"`
}

type FunctionState

type FunctionState struct {
	Key         string `json:"key"`
	StringValue string `json:"stringValue"`
	ByteValue   []byte `json:"byteValue"`
	NumValue    int64  `json:"numberValue"`
	Version     int64  `json:"version"`
}

type FunctionStats

type FunctionStats struct {
	// Overall total number of records function received from source
	ReceivedTotal int64 `json:"receivedTotal"`

	// Overall total number of records successfully processed by user function
	ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"`

	// Overall total number of system exceptions thrown
	SystemExceptionsTotal int64 `json:"systemExceptionsTotal"`

	// Overall total number of user exceptions thrown
	UserExceptionsTotal int64 `json:"userExceptionsTotal"`

	// Average process latency for function
	AvgProcessLatency float64 `json:"avgProcessLatency"`

	// Timestamp of when the function was last invoked by any instance
	LastInvocation int64 `json:"lastInvocation"`

	OneMin FunctionInstanceStatsDataBase `json:"oneMin"`

	Instances []FunctionInstanceStats `json:"instances"`

	FunctionInstanceStats
}

func (*FunctionStats) AddInstance

func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats)

func (*FunctionStats) CalculateOverall

func (fs *FunctionStats) CalculateOverall() *FunctionStats

type FunctionStatus

type FunctionStatus struct {
	NumInstances int                      `json:"numInstances"`
	NumRunning   int                      `json:"numRunning"`
	Instances    []FunctionInstanceStatus `json:"instances"`
}

type Functions

type Functions interface {
	// CreateFunc create a new function.
	CreateFunc(data *FunctionConfig, fileName string) error

	// CreateFuncWithURL create a new function by providing url from which fun-pkg can be downloaded.
	// supported url: http/file
	// eg:
	//  File: file:/dir/fileName.jar
	//  Http: http://www.repo.com/fileName.jar
	//
	// @param functionConfig
	//      the function configuration object
	// @param pkgURL
	//      url from which pkg can be downloaded
	CreateFuncWithURL(data *FunctionConfig, pkgURL string) error

	// StopFunction stop all function instances
	StopFunction(tenant, namespace, name string) error

	// StopFunctionWithID stop function instance
	StopFunctionWithID(tenant, namespace, name string, instanceID int) error

	// DeleteFunction delete an existing function
	DeleteFunction(tenant, namespace, name string) error

	// StartFunction start all function instances
	StartFunction(tenant, namespace, name string) error

	// StartFunctionWithID start function instance
	StartFunctionWithID(tenant, namespace, name string, instanceID int) error

	// RestartFunction restart all function instances
	RestartFunction(tenant, namespace, name string) error

	// RestartFunctionWithID restart function instance
	RestartFunctionWithID(tenant, namespace, name string, instanceID int) error

	// GetFunctions returns the list of functions
	GetFunctions(tenant, namespace string) ([]string, error)

	// GetFunction returns the configuration for the specified function
	GetFunction(tenant, namespace, name string) (FunctionConfig, error)

	// GetFunctionStatus returns the current status of a function
	GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error)

	// GetFunctionStatusWithInstanceID returns the current status of a function instance
	GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error)

	// GetFunctionStats returns the current stats of a function
	GetFunctionStats(tenant, namespace, name string) (FunctionStats, error)

	// GetFunctionStatsWithInstanceID gets the current stats of a function instance
	GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error)

	// GetFunctionState fetch the current state associated with a Pulsar Function
	//
	// Response Example:
	// 		{ "value : 12, version : 2"}
	GetFunctionState(tenant, namespace, name, key string) (FunctionState, error)

	// PutFunctionState puts the given state associated with a Pulsar Function
	PutFunctionState(tenant, namespace, name string, state FunctionState) error

	// TriggerFunction triggers the function by writing to the input topic
	TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error)

	// UpdateFunction updates the configuration for a function.
	UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error

	// UpdateFunctionWithURL updates the configuration for a function.
	//
	// Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
	// eg:
	// File: file:/dir/fileName.jar
	// Http: http://www.repo.com/fileName.jar
	UpdateFunctionWithURL(functionConfig *FunctionConfig, pkgURL string, updateOptions *UpdateOptions) error
}

Functions is admin interface for functions management

type FunctionsWorker

type FunctionsWorker interface {
	// Get all functions stats on a worker
	GetFunctionsStats() ([]*WorkerFunctionInstanceStats, error)

	// Get worker metrics
	GetMetrics() ([]*Metrics, error)

	// Get List of all workers belonging to this cluster
	GetCluster() ([]*WorkerInfo, error)

	// Get the worker who is the leader of the clusterv
	GetClusterLeader() (*WorkerInfo, error)

	// Get the function assignment among the cluster
	GetAssignments() (map[string][]string, error)
}

type GetSchemaResponse

type GetSchemaResponse struct {
	Version    int64             `json:"version"`
	Type       string            `json:"type"`
	Timestamp  int64             `json:"timestamp"`
	Data       string            `json:"data"`
	Properties map[string]string `json:"properties"`
}

type InternalConfigurationData

type InternalConfigurationData struct {
	ZookeeperServers          string `json:"zookeeperServers"`
	ConfigurationStoreServers string `json:"configurationStoreServers"`
	LedgersRootPath           string `json:"ledgersRootPath"`
	StateStorageServiceURL    string `json:"stateStorageServiceUrl"`
}

type KeyValue

type KeyValue struct {
	Key                  *string  `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
	Value                *string  `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

nolint

func (*KeyValue) ProtoMessage

func (*KeyValue) ProtoMessage()

func (*KeyValue) Reset

func (m *KeyValue) Reset()

func (*KeyValue) String

func (m *KeyValue) String() string

type LedgerInfo

type LedgerInfo struct {
	LedgerID  int64 `json:"ledgerId"`
	Entries   int64 `json:"entries"`
	Size      int64 `json:"size"`
	Timestamp int64 `json:"timestamp"`
}

type LocalBrokerData

type LocalBrokerData struct {
	// URLs to satisfy contract of ServiceLookupData (used by NamespaceService).
	WebServiceURL              string `json:"webServiceUrl"`
	WebServiceURLTLS           string `json:"webServiceUrlTls"`
	PulsarServiceURL           string `json:"pulsarServiceUrl"`
	PulsarServiceURLTLS        string `json:"pulsarServiceUrlTls"`
	PersistentTopicsEnabled    bool   `json:"persistentTopicsEnabled"`
	NonPersistentTopicsEnabled bool   `json:"nonPersistentTopicsEnabled"`

	// Most recently available system resource usage.
	CPU          ResourceUsage `json:"cpu"`
	Memory       ResourceUsage `json:"memory"`
	DirectMemory ResourceUsage `json:"directMemory"`
	BandwidthIn  ResourceUsage `json:"bandwidthIn"`
	BandwidthOut ResourceUsage `json:"bandwidthOut"`

	// Message data from the most recent namespace bundle stats.
	MsgThroughputIn  float64 `json:"msgThroughputIn"`
	MsgThroughputOut float64 `json:"msgThroughputOut"`
	MsgRateIn        float64 `json:"msgRateIn"`
	MsgRateOut       float64 `json:"msgRateOut"`

	// Timestamp of last update.
	LastUpdate int64 `json:"lastUpdate"`

	// The stats given in the most recent invocation of update.
	LastStats    map[string]*NamespaceBundleStats `json:"lastStats"`
	NumTopics    int                              `json:"numTopics"`
	NumBundles   int                              `json:"numBundles"`
	NumConsumers int                              `json:"numConsumers"`
	NumProducers int                              `json:"numProducers"`

	// All bundles belonging to this broker.
	Bundles []string `json:"bundles"`

	// The bundles gained since the last invocation of update.
	LastBundleGains []string `json:"lastBundleGains"`

	// The bundles lost since the last invocation of update.
	LastBundleLosses []string `json:"lastBundleLosses"`

	// The version string that this broker is running, obtained from the Maven build artifact in the POM
	BrokerVersionString string `json:"brokerVersionString"`

	// This place-holder requires to identify correct LoadManagerReport type while deserializing
	LoadReportType string `json:"loadReportType"`

	// the external protocol data advertised by protocol handlers.
	Protocols map[string]string `json:"protocols"`
}

func NewLocalBrokerData

func NewLocalBrokerData() LocalBrokerData

type LongDescription

type LongDescription struct {
	CommandUsedFor    string
	CommandPermission string
	CommandExamples   []Example
	CommandOutput     []Output
	CommandScope      string
}

func (*LongDescription) ExampleToString

func (desc *LongDescription) ExampleToString() string

func (*LongDescription) ToString

func (desc *LongDescription) ToString() string

type LongRunningProcessStatus

type LongRunningProcessStatus struct {
	Status    Status `json:"status"`
	LastError string `json:"lastError"`
}

type LookupData

type LookupData struct {
	BrokerURL    string `json:"brokerUrl"`
	BrokerURLTLS string `json:"brokerUrlTls"`
	HTTPURL      string `json:"httpUrl"`
	HTTPURLTLS   string `json:"httpUrlTls"`
}

type ManagedLedgerInfo

type ManagedLedgerInfo struct {
	Version            int                   `json:"version"`
	CreationDate       string                `json:"creationDate"`
	ModificationData   string                `json:"modificationData"`
	Ledgers            []LedgerInfo          `json:"ledgers"`
	TerminatedPosition PositionInfo          `json:"terminatedPosition"`
	Cursors            map[string]CursorInfo `json:"cursors"`
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(topic string, id MessageID, payload []byte, properties map[string]string) *Message

func (*Message) GetMessageID

func (m *Message) GetMessageID() MessageID

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

func (*Message) GetProperties

func (m *Message) GetProperties() map[string]string

type MessageID

type MessageID struct {
	LedgerID         int64 `json:"ledgerId"`
	EntryID          int64 `json:"entryId"`
	PartitionedIndex int   `json:"partitionedIndex"`
	BatchIndex       int   `json:"-"`
}

func ParseMessageID

func ParseMessageID(str string) (*MessageID, error)

func (MessageID) String

func (m MessageID) String() string

type MessageRangeInfo

type MessageRangeInfo struct {
	From      PositionInfo `json:"from"`
	To        PositionInfo `json:"to"`
	Offloaded bool         `json:"offloaded"`
}

type Metrics

type Metrics struct {
	Metrics    map[string]interface{} `json:"metrics"`
	Dimensions map[string]string      `json:"dimensions"`
}

func NewMetrics

func NewMetrics(dimensionMap map[string]string) *Metrics

type NameSpaceName

type NameSpaceName struct {
	// contains filtered or unexported fields
}

func GetNameSpaceName

func GetNameSpaceName(tenant, namespace string) (*NameSpaceName, error)

func GetNamespaceName

func GetNamespaceName(completeName string) (*NameSpaceName, error)

func (*NameSpaceName) String

func (n *NameSpaceName) String() string

type NamespaceBundleStats

type NamespaceBundleStats struct {
	MsgRateIn        float64 `json:"msgRateIn"`
	MsgThroughputIn  float64 `json:"msgThroughputIn"`
	MsgRateOut       float64 `json:"msgRateOut"`
	MsgThroughputOut float64 `json:"msgThroughputOut"`
	ConsumerCount    int     `json:"consumerCount"`
	ProducerCount    int     `json:"producerCount"`
	TopicsNum        int64   `json:"topics"`
	CacheSize        int64   `json:"cacheSize"`

	// Consider the throughput equal if difference is less than 100 KB/s
	ThroughputDifferenceThreshold float64 `json:"throughputDifferenceThreshold"`
	// Consider the msgRate equal if the difference is less than 100
	MsgRateDifferenceThreshold float64 `json:"msgRateDifferenceThreshold"`
	// Consider the total topics/producers/consumers equal if the difference is less than 500
	TopicConnectionDifferenceThreshold int64 `json:"topicConnectionDifferenceThreshold"`
	// Consider the cache size equal if the difference is less than 100 kb
	CacheSizeDifferenceThreshold int64 `json:"cacheSizeDifferenceThreshold"`
}

func NewNamespaceBundleStats

func NewNamespaceBundleStats() *NamespaceBundleStats

type NamespaceIsolationData

type NamespaceIsolationData struct {
	Namespaces         []string               `json:"namespaces"`
	Primary            []string               `json:"primary"`
	Secondary          []string               `json:"secondary"`
	AutoFailoverPolicy AutoFailoverPolicyData `json:"auto_failover_policy"`
}

func CreateNamespaceIsolationData

func CreateNamespaceIsolationData(namespaces, primary, secondry []string, autoFailoverPolicyTypeName string,
	autoFailoverPolicyParams map[string]string) (*NamespaceIsolationData, error)

type NamespaceOwnershipStatus

type NamespaceOwnershipStatus struct {
	BrokerAssignment BrokerAssignment `json:"broker_assignment"`
	IsControlled     bool             `json:"is_controlled"`
	IsActive         bool             `json:"is_active"`
}

type Namespaces

type Namespaces interface {
	// GetNamespaces returns the list of all the namespaces for a certain tenant
	GetNamespaces(tenant string) ([]string, error)

	// GetTopics returns the list of all the topics under a certain namespace
	GetTopics(namespace string) ([]string, error)

	// GetPolicies returns the dump all the policies specified for a namespace
	GetPolicies(namespace string) (*Policies, error)

	// CreateNamespace creates a new empty namespace with no policies attached
	CreateNamespace(namespace string) error

	// CreateNsWithNumBundles creates a new empty namespace with no policies attached
	CreateNsWithNumBundles(namespace string, numBundles int) error

	// CreateNsWithPolices creates a new namespace with the specified policies
	CreateNsWithPolices(namespace string, polices Policies) error

	// CreateNsWithBundlesData creates a new empty namespace with no policies attached
	CreateNsWithBundlesData(namespace string, bundleData *BundlesData) error

	// DeleteNamespace deletes an existing namespace
	DeleteNamespace(namespace string) error

	// DeleteNamespaceBundle deletes an existing bundle in a namespace
	DeleteNamespaceBundle(namespace string, bundleRange string) error

	// SetNamespaceMessageTTL sets the messages Time to Live for all the topics within a namespace
	SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error

	// GetNamespaceMessageTTL returns the message TTL for a namespace
	GetNamespaceMessageTTL(namespace string) (int, error)

	// GetRetention returns the retention configuration for a namespace
	GetRetention(namespace string) (*RetentionPolicies, error)

	// SetRetention sets the retention configuration for all the topics on a namespace
	SetRetention(namespace string, policy RetentionPolicies) error

	// GetBacklogQuotaMap returns backlog quota map on a namespace
	GetBacklogQuotaMap(namespace string) (map[BacklogQuotaType]BacklogQuota, error)

	// SetBacklogQuota sets a backlog quota for all the topics on a namespace
	SetBacklogQuota(namespace string, backlogQuota BacklogQuota) error

	// RemoveBacklogQuota removes a backlog quota policy from a namespace
	RemoveBacklogQuota(namespace string) error

	// SetSchemaValidationEnforced sets schema validation enforced for namespace
	SetSchemaValidationEnforced(namespace NameSpaceName, schemaValidationEnforced bool) error

	// GetSchemaValidationEnforced returns schema validation enforced for namespace
	GetSchemaValidationEnforced(namespace NameSpaceName) (bool, error)

	// SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the a new schema provided
	// by a producer is compatible with the current schema before it is installed
	SetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName, strategy SchemaCompatibilityStrategy) error

	// GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the a new schema provided
	// by a producer is compatible with the current schema before it is installed
	GetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName) (SchemaCompatibilityStrategy, error)

	// ClearOffloadDeleteLag clears the offload deletion lag for a namespace.
	ClearOffloadDeleteLag(namespace NameSpaceName) error

	// SetOffloadDeleteLag sets the offload deletion lag for a namespace
	SetOffloadDeleteLag(namespace NameSpaceName, timeMs int64) error

	// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds
	GetOffloadDeleteLag(namespace NameSpaceName) (int64, error)

	// SetOffloadThreshold sets the offloadThreshold for a namespace
	SetOffloadThreshold(namespace NameSpaceName, threshold int64) error

	// GetOffloadThreshold returns the offloadThreshold for a namespace
	GetOffloadThreshold(namespace NameSpaceName) (int64, error)

	// SetCompactionThreshold sets the compactionThreshold for a namespace
	SetCompactionThreshold(namespace NameSpaceName, threshold int64) error

	// GetCompactionThreshold returns the compactionThreshold for a namespace
	GetCompactionThreshold(namespace NameSpaceName) (int64, error)

	// SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace.
	SetMaxConsumersPerSubscription(namespace NameSpaceName, max int) error

	// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace.
	GetMaxConsumersPerSubscription(namespace NameSpaceName) (int, error)

	// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
	SetMaxConsumersPerTopic(namespace NameSpaceName, max int) error

	// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace.
	GetMaxConsumersPerTopic(namespace NameSpaceName) (int, error)

	// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
	SetMaxProducersPerTopic(namespace NameSpaceName, max int) error

	// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
	GetMaxProducersPerTopic(namespace NameSpaceName) (int, error)

	// GetNamespaceReplicationClusters returns the replication clusters for a namespace
	GetNamespaceReplicationClusters(namespace string) ([]string, error)

	// SetNamespaceReplicationClusters returns the replication clusters for a namespace
	SetNamespaceReplicationClusters(namespace string, clusterIds []string) error

	// SetNamespaceAntiAffinityGroup sets anti-affinity group name for a namespace
	SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error

	// GetAntiAffinityNamespaces returns all namespaces that grouped with given anti-affinity group
	GetAntiAffinityNamespaces(tenant, cluster, namespaceAntiAffinityGroup string) ([]string, error)

	// GetNamespaceAntiAffinityGroup returns anti-affinity group name for a namespace
	GetNamespaceAntiAffinityGroup(namespace string) (string, error)

	// DeleteNamespaceAntiAffinityGroup deletes anti-affinity group name for a namespace
	DeleteNamespaceAntiAffinityGroup(namespace string) error

	// SetDeduplicationStatus sets the deduplication status for all topics within a namespace
	// When deduplication is enabled, the broker will prevent to store the same Message multiple times
	SetDeduplicationStatus(namespace string, enableDeduplication bool) error

	// SetPersistence sets the persistence configuration for all the topics on a namespace
	SetPersistence(namespace string, persistence PersistencePolicies) error

	// GetPersistence returns the persistence configuration for a namespace
	GetPersistence(namespace string) (*PersistencePolicies, error)

	// SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are
	// part of given affinity group
	SetBookieAffinityGroup(namespace string, bookieAffinityGroup BookieAffinityGroupData) error

	// DeleteBookieAffinityGroup deletes bookie affinity group configured for a namespace
	DeleteBookieAffinityGroup(namespace string) error

	// GetBookieAffinityGroup returns bookie affinity group configured for a namespace
	GetBookieAffinityGroup(namespace string) (*BookieAffinityGroupData, error)

	// Unload a namespace from the current serving broker
	Unload(namespace string) error

	// UnloadNamespaceBundle unloads namespace bundle
	UnloadNamespaceBundle(namespace, bundle string) error

	// SplitNamespaceBundle splits namespace bundle
	SplitNamespaceBundle(namespace, bundle string, unloadSplitBundles bool) error

	// GetNamespacePermissions returns permissions on a namespace
	GetNamespacePermissions(namespace NameSpaceName) (map[string][]AuthAction, error)

	// GrantNamespacePermission grants permission on a namespace.
	GrantNamespacePermission(namespace NameSpaceName, role string, action []AuthAction) error

	// RevokeNamespacePermission revokes permissions on a namespace.
	RevokeNamespacePermission(namespace NameSpaceName, role string) error

	// GrantSubPermission grants permission to role to access subscription's admin-api
	GrantSubPermission(namespace NameSpaceName, sName string, roles []string) error

	// RevokeSubPermission revoke permissions on a subscription's admin-api access
	RevokeSubPermission(namespace NameSpaceName, sName, role string) error

	// SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace
	SetSubscriptionAuthMode(namespace NameSpaceName, mode SubscriptionAuthMode) error

	// SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace
	SetEncryptionRequiredStatus(namespace NameSpaceName, encrypt bool) error

	// UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace
	UnsubscribeNamespace(namespace NameSpaceName, sName string) error

	// UnsubscribeNamespaceBundle unsubscribe the given subscription on all topics on a namespace bundle
	UnsubscribeNamespaceBundle(namespace NameSpaceName, bundle, sName string) error

	// ClearNamespaceBundleBacklogForSubscription clears backlog for a given subscription on all
	// topics on a namespace bundle
	ClearNamespaceBundleBacklogForSubscription(namespace NameSpaceName, bundle, sName string) error

	// ClearNamespaceBundleBacklog clears backlog for all topics on a namespace bundle
	ClearNamespaceBundleBacklog(namespace NameSpaceName, bundle string) error

	// ClearNamespaceBacklogForSubscription clears backlog for a given subscription on all topics on a namespace
	ClearNamespaceBacklogForSubscription(namespace NameSpaceName, sName string) error

	// ClearNamespaceBacklog clears backlog for all topics on a namespace
	ClearNamespaceBacklog(namespace NameSpaceName) error

	// SetReplicatorDispatchRate sets replicator-Message-dispatch-rate (Replicators under this namespace
	// can dispatch this many messages per second)
	SetReplicatorDispatchRate(namespace NameSpaceName, rate DispatchRate) error

	// Get replicator-Message-dispatch-rate (Replicators under this namespace
	// can dispatch this many messages per second)
	GetReplicatorDispatchRate(namespace NameSpaceName) (DispatchRate, error)

	// SetSubscriptionDispatchRate sets subscription-Message-dispatch-rate (subscriptions under this namespace
	// can dispatch this many messages per second)
	SetSubscriptionDispatchRate(namespace NameSpaceName, rate DispatchRate) error

	// GetSubscriptionDispatchRate returns subscription-Message-dispatch-rate (subscriptions under this namespace
	// can dispatch this many messages per second)
	GetSubscriptionDispatchRate(namespace NameSpaceName) (DispatchRate, error)

	// SetSubscribeRate sets namespace-subscribe-rate (topics under this namespace will limit by subscribeRate)
	SetSubscribeRate(namespace NameSpaceName, rate SubscribeRate) error

	// GetSubscribeRate returns namespace-subscribe-rate (topics under this namespace allow subscribe
	// times per consumer in a period)
	GetSubscribeRate(namespace NameSpaceName) (SubscribeRate, error)

	// SetDispatchRate sets Message-dispatch-rate (topics under this namespace can dispatch
	// this many messages per second)
	SetDispatchRate(namespace NameSpaceName, rate DispatchRate) error

	// GetDispatchRate returns Message-dispatch-rate (topics under this namespace can dispatch
	// this many messages per second)
	GetDispatchRate(namespace NameSpaceName) (DispatchRate, error)
}

Namespaces is admin interface for namespaces management

type NamespacesData

type NamespacesData struct {
	Enable                         bool     `json:"enable"`
	Unload                         bool     `json:"unload"`
	NumBundles                     int      `json:"numBundles"`
	BookkeeperEnsemble             int      `json:"bookkeeperEnsemble"`
	BookkeeperWriteQuorum          int      `json:"bookkeeperWriteQuorum"`
	MessageTTL                     int      `json:"messageTTL"`
	BookkeeperAckQuorum            int      `json:"bookkeeperAckQuorum"`
	ManagedLedgerMaxMarkDeleteRate float64  `json:"managedLedgerMaxMarkDeleteRate"`
	ClusterIds                     string   `json:"clusterIds"`
	RetentionTimeStr               string   `json:"retentionTimeStr"`
	LimitStr                       string   `json:"limitStr"`
	PolicyStr                      string   `json:"policyStr"`
	AntiAffinityGroup              string   `json:"antiAffinityGroup"`
	Tenant                         string   `json:"tenant"`
	Cluster                        string   `json:"cluster"`
	Bundle                         string   `json:"bundle"`
	Clusters                       []string `json:"clusters"`
}

type NsIsolationPoliciesData

type NsIsolationPoliciesData struct {
	Namespaces                 []string `json:"namespaces"`
	Primary                    []string `json:"primary"`
	Secondary                  []string `json:"secondary"`
	AutoFailoverPolicyTypeName string   `json:"autoFailoverPolicyTypeName"`
	AutoFailoverPolicyParams   string   `json:"autoFailoverPolicyParams"`
}

type NsIsolationPolicy

type NsIsolationPolicy interface {
	// Create a namespace isolation policy for a cluster
	CreateNamespaceIsolationPolicy(cluster, policyName string, namespaceIsolationData NamespaceIsolationData) error

	// Delete a namespace isolation policy for a cluster
	DeleteNamespaceIsolationPolicy(cluster, policyName string) error

	// Get a single namespace isolation policy for a cluster
	GetNamespaceIsolationPolicy(cluster, policyName string) (*NamespaceIsolationData, error)

	// Get the namespace isolation policies of a cluster
	GetNamespaceIsolationPolicies(cluster string) (map[string]NamespaceIsolationData, error)

	// Returns list of active brokers with namespace-isolation policies attached to it.
	GetBrokersWithNamespaceIsolationPolicy(cluster string) ([]BrokerNamespaceIsolationData, error)

	// Returns active broker with namespace-isolation policies attached to it.
	GetBrokerWithNamespaceIsolationPolicy(cluster, broker string) (*BrokerNamespaceIsolationData, error)
}

type OffloadProcessStatus

type OffloadProcessStatus struct {
	Status                  Status    `json:"status"`
	LastError               string    `json:"lastError"`
	FirstUnOffloadedMessage MessageID `json:"firstUnoffloadedMessage"`
}

type Output

type Output struct {
	Desc string
	Out  string
}

type PartitionedTopicMetadata

type PartitionedTopicMetadata struct {
	Partitions int `json:"partitions"`
}

Topic data

type PartitionedTopicStats

type PartitionedTopicStats struct {
	MsgRateIn           float64                      `json:"msgRateIn"`
	MsgRateOut          float64                      `json:"msgRateOut"`
	MsgThroughputIn     float64                      `json:"msgThroughputIn"`
	MsgThroughputOut    float64                      `json:"msgThroughputOut"`
	AverageMsgSize      float64                      `json:"averageMsgSize"`
	StorageSize         int64                        `json:"storageSize"`
	Publishers          []PublisherStats             `json:"publishers"`
	Subscriptions       map[string]SubscriptionStats `json:"subscriptions"`
	Replication         map[string]ReplicatorStats   `json:"replication"`
	DeDuplicationStatus string                       `json:"deduplicationStatus"`
	Metadata            PartitionedTopicMetadata     `json:"metadata"`
	Partitions          map[string]TopicStats        `json:"partitions"`
}

type PersistencePolicies

type PersistencePolicies struct {
	BookkeeperEnsemble             int     `json:"bookkeeperEnsemble"`
	BookkeeperWriteQuorum          int     `json:"bookkeeperWriteQuorum"`
	BookkeeperAckQuorum            int     `json:"bookkeeperAckQuorum"`
	ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"`
}

func NewPersistencePolicies

func NewPersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum, bookkeeperAckQuorum int,
	managedLedgerMaxMarkDeleteRate float64) PersistencePolicies

type PersistentTopicInternalStats

type PersistentTopicInternalStats struct {
	WaitingCursorsCount                int                    `json:"waitingCursorsCount"`
	PendingAddEntriesCount             int                    `json:"pendingAddEntriesCount"`
	EntriesAddedCounter                int64                  `json:"entriesAddedCounter"`
	NumberOfEntries                    int64                  `json:"numberOfEntries"`
	TotalSize                          int64                  `json:"totalSize"`
	CurrentLedgerEntries               int64                  `json:"currentLedgerEntries"`
	CurrentLedgerSize                  int64                  `json:"currentLedgerSize"`
	LastLedgerCreatedTimestamp         string                 `json:"lastLedgerCreatedTimestamp"`
	LastLedgerCreationFailureTimestamp string                 `json:"lastLedgerCreationFailureTimestamp"`
	LastConfirmedEntry                 string                 `json:"lastConfirmedEntry"`
	State                              string                 `json:"state"`
	Ledgers                            []LedgerInfo           `json:"ledgers"`
	Cursors                            map[string]CursorStats `json:"cursors"`
}

type Policies

type Policies struct {
	Bundles                     *BundlesData                      `json:"bundles"`
	Persistence                 *PersistencePolicies              `json:"persistence"`
	RetentionPolicies           *RetentionPolicies                `json:"retention_policies"`
	SchemaValidationEnforced    bool                              `json:"schema_validation_enforced"`
	DeduplicationEnabled        bool                              `json:"deduplicationEnabled"`
	Deleted                     bool                              `json:"deleted"`
	EncryptionRequired          bool                              `json:"encryption_required"`
	MessageTTLInSeconds         int                               `json:"message_ttl_in_seconds"`
	MaxProducersPerTopic        int                               `json:"max_producers_per_topic"`
	MaxConsumersPerTopic        int                               `json:"max_consumers_per_topic"`
	MaxConsumersPerSubscription int                               `json:"max_consumers_per_subscription"`
	CompactionThreshold         int64                             `json:"compaction_threshold"`
	OffloadThreshold            int64                             `json:"offload_threshold"`
	OffloadDeletionLagMs        int64                             `json:"offload_deletion_lag_ms"`
	AntiAffinityGroup           string                            `json:"antiAffinityGroup"`
	ReplicationClusters         []string                          `json:"replication_clusters"`
	LatencyStatsSampleRate      map[string]int                    `json:"latency_stats_sample_rate"`
	BacklogQuotaMap             map[BacklogQuotaType]BacklogQuota `json:"backlog_quota_map"`
	TopicDispatchRate           map[string]DispatchRate           `json:"topicDispatchRate"`
	SubscriptionDispatchRate    map[string]DispatchRate           `json:"subscriptionDispatchRate"`
	ReplicatorDispatchRate      map[string]DispatchRate           `json:"replicatorDispatchRate"`
	ClusterSubscribeRate        map[string]SubscribeRate          `json:"clusterSubscribeRate"`
	SchemaCompatibilityStrategy SchemaCompatibilityStrategy       `json:"schema_auto_update_compatibility_strategy"`
	AuthPolicies                AuthPolicies                      `json:"auth_policies"`
	SubscriptionAuthMode        SubscriptionAuthMode              `json:"subscription_auth_mode"`
}

func NewDefaultPolicies

func NewDefaultPolicies() *Policies

type PoolArenaStats

type PoolArenaStats struct {
	NumTinySubpages            int                  `json:"numTinySubpages"`
	NumSmallSubpages           int                  `json:"numSmallSubpages"`
	NumChunkLists              int                  `json:"numChunkLists"`
	TinySubpages               []PoolSubpageStats   `json:"tinySubpages"`
	SmallSubpages              []PoolSubpageStats   `json:"smallSubpages"`
	ChunkLists                 []PoolChunkListStats `json:"chunkLists"`
	NumAllocations             int64                `json:"numAllocations"`
	NumTinyAllocations         int64                `json:"numTinyAllocations"`
	NumSmallAllocations        int64                `json:"numSmallAllocations"`
	NumNormalAllocations       int64                `json:"numNormalAllocations"`
	NumHugeAllocations         int64                `json:"numHugeAllocations"`
	NumDeallocations           int64                `json:"numDeallocations"`
	NumTinyDeallocations       int64                `json:"numTinyDeallocations"`
	NumSmallDeallocations      int64                `json:"numSmallDeallocations"`
	NumNormalDeallocations     int64                `json:"numNormalDeallocations"`
	NumHugeDeallocations       int64                `json:"numHugeDeallocations"`
	NumActiveAllocations       int64                `json:"numActiveAllocations"`
	NumActiveTinyAllocations   int64                `json:"numActiveTinyAllocations"`
	NumActiveSmallAllocations  int64                `json:"numActiveSmallAllocations"`
	NumActiveNormalAllocations int64                `json:"numActiveNormalAllocations"`
	NumActiveHugeAllocations   int64                `json:"numActiveHugeAllocations"`
}

type PoolChunkListStats

type PoolChunkListStats struct {
	MinUsage int              `json:"minUsage"`
	MaxUsage int              `json:"maxUsage"`
	Chunks   []PoolChunkStats `json:"chunks"`
}

type PoolChunkStats

type PoolChunkStats struct {
	Usage     int `json:"usage"`
	ChunkSize int `json:"chunkSize"`
	FreeBytes int `json:"freeBytes"`
}

type PoolSubpageStats

type PoolSubpageStats struct {
	MaxNumElements int `json:"maxNumElements"`
	NumAvailable   int `json:"numAvailable"`
	ElementSize    int `json:"elementSize"`
	PageSize       int `json:"pageSize"`
}

type PositionInfo

type PositionInfo struct {
	LedgerID int64 `json:"ledgerId"`
	EntryID  int64 `json:"entryId"`
}

type PostSchemaPayload

type PostSchemaPayload struct {
	SchemaType string            `json:"type"`
	Schema     string            `json:"schema"`
	Properties map[string]string `json:"properties"`
}

Payload with information about a schema

type PublisherStats

type PublisherStats struct {
	ProducerID      int64             `json:"producerId"`
	MsgRateIn       float64           `json:"msgRateIn"`
	MsgThroughputIn float64           `json:"msgThroughputIn"`
	AverageMsgSize  float64           `json:"averageMsgSize"`
	Metadata        map[string]string `json:"metadata"`
}

type ReplicatorStats

type ReplicatorStats struct {
	Connected                 bool    `json:"connected"`
	MsgRateIn                 float64 `json:"msgRateIn"`
	MsgRateOut                float64 `json:"msgRateOut"`
	MsgThroughputIn           float64 `json:"msgThroughputIn"`
	MsgThroughputOut          float64 `json:"msgThroughputOut"`
	MsgRateExpired            float64 `json:"msgRateExpired"`
	ReplicationBacklog        int64   `json:"replicationBacklog"`
	ReplicationDelayInSeconds int64   `json:"replicationDelayInSeconds"`
	InboundConnection         string  `json:"inboundConnection"`
	InboundConnectedSince     string  `json:"inboundConnectedSince"`
	OutboundConnection        string  `json:"outboundConnection"`
	OutboundConnectedSince    string  `json:"outboundConnectedSince"`
}

type ResourceQuota

type ResourceQuota struct {
	// messages published per second
	MsgRateIn float64 `json:"msgRateIn"`
	// messages consumed per second
	MsgRateOut float64 `json:"msgRateOut"`
	// incoming bytes per second
	BandwidthIn float64 `json:"bandwidthIn"`
	// outgoing bytes per second
	BandwidthOut float64 `json:"bandwidthOut"`
	// used memory in Mbytes
	Memory float64 `json:"memory"`
	// allow the quota be dynamically re-calculated according to real traffic
	Dynamic bool `json:"dynamic"`
}

func NewResourceQuota

func NewResourceQuota() *ResourceQuota

type ResourceQuotaData

type ResourceQuotaData struct {
	Names        string `json:"names"`
	Bundle       string `json:"bundle"`
	MsgRateIn    int64  `json:"msgRateIn"`
	MsgRateOut   int64  `json:"msgRateOut"`
	BandwidthIn  int64  `json:"bandwidthIn"`
	BandwidthOut int64  `json:"bandwidthOut"`
	Memory       int64  `json:"memory"`
	Dynamic      bool   `json:"dynamic"`
}

type ResourceQuotas

type ResourceQuotas interface {
	// Get default resource quota for new resource bundles.
	GetDefaultResourceQuota() (*ResourceQuota, error)

	// Set default resource quota for new namespace bundles.
	SetDefaultResourceQuota(quota ResourceQuota) error

	// Get resource quota of a namespace bundle.
	GetNamespaceBundleResourceQuota(namespace, bundle string) (*ResourceQuota, error)

	// Set resource quota for a namespace bundle.
	SetNamespaceBundleResourceQuota(namespace, bundle string, quota ResourceQuota) error

	// Reset resource quota for a namespace bundle to default value.
	ResetNamespaceBundleResourceQuota(namespace, bundle string) error
}

type ResourceUsage

type ResourceUsage struct {
	Usage float64 `json:"usage"`
	Limit float64 `json:"limit"`
}

func (*ResourceUsage) CompareTo

func (ru *ResourceUsage) CompareTo(o *ResourceUsage) int

func (*ResourceUsage) PercentUsage

func (ru *ResourceUsage) PercentUsage() float32

func (*ResourceUsage) Reset

func (ru *ResourceUsage) Reset()

type Resources

type Resources struct {
	CPU  float64 `json:"cpu"`
	Disk int64   `json:"disk"`
	RAM  int64   `json:"ram"`
}

func NewDefaultResources

func NewDefaultResources() *Resources

type RetentionPolicies

type RetentionPolicies struct {
	RetentionTimeInMinutes int   `json:"retentionTimeInMinutes"`
	RetentionSizeInMB      int64 `json:"retentionSizeInMB"`
}

func NewRetentionPolicies

func NewRetentionPolicies(retentionTimeInMinutes int, retentionSizeInMB int) RetentionPolicies

type RetentionPolicy

type RetentionPolicy string
const (
	ProducerRequestHold     RetentionPolicy = "producer_request_hold"
	ProducerException       RetentionPolicy = "producer_exception"
	ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction"
)

type Schema

type Schema interface {
	// GetSchemaInfo retrieves the latest schema of a topic
	GetSchemaInfo(topic string) (*SchemaInfo, error)

	// GetSchemaInfoWithVersion retrieves the latest schema with version of a topic
	GetSchemaInfoWithVersion(topic string) (*SchemaInfoWithVersion, error)

	// GetSchemaInfoByVersion retrieves the schema of a topic at a given <tt>version</tt>
	GetSchemaInfoByVersion(topic string, version int64) (*SchemaInfo, error)

	// DeleteSchema deletes the schema associated with a given <tt>topic</tt>
	DeleteSchema(topic string) error

	// CreateSchemaByPayload creates a schema for a given <tt>topic</tt>
	CreateSchemaByPayload(topic string, schemaPayload PostSchemaPayload) error
}

Schema is admin interface for schema management

type SchemaCompatibilityStrategy

type SchemaCompatibilityStrategy string
const (
	AutoUpdateDisabled SchemaCompatibilityStrategy = "AutoUpdateDisabled"
	Backward           SchemaCompatibilityStrategy = "Backward"
	Forward            SchemaCompatibilityStrategy = "Forward"
	Full               SchemaCompatibilityStrategy = "Full"
	AlwaysCompatible   SchemaCompatibilityStrategy = "AlwaysCompatible"
	BackwardTransitive SchemaCompatibilityStrategy = "BackwardTransitive"
	ForwardTransitive  SchemaCompatibilityStrategy = "ForwardTransitive"
	FullTransitive     SchemaCompatibilityStrategy = "FullTransitive"
)

func ParseSchemaAutoUpdateCompatibilityStrategy

func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaCompatibilityStrategy, error)

func (SchemaCompatibilityStrategy) String

type SchemaData

type SchemaData struct {
	Version         int64  `json:"version"`
	Filename        string `json:"filename"`
	Jar             string `json:"jar"`
	Type            string `json:"type"`
	Classname       string `json:"classname"`
	AlwaysAllowNull bool   `json:"alwaysAllowNull"`
	DryRun          bool   `json:"dryRun"`
}

type SchemaInfo

type SchemaInfo struct {
	Name       string            `json:"name"`
	Schema     []byte            `json:"schema"`
	Type       string            `json:"type"`
	Properties map[string]string `json:"properties"`
}

type SchemaInfoWithVersion

type SchemaInfoWithVersion struct {
	Version    int64       `json:"version"`
	SchemaInfo *SchemaInfo `json:"schemaInfo"`
}

type SingleMessageMetadata

type SingleMessageMetadata struct {
	Properties   []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
	PartitionKey *string     `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
	PayloadSize  *int32      `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"`
	CompactedOut *bool       `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
	// the timestamp that this event occurs. it is typically set by applications.
	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
	EventTime              *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
	PartitionKeyB64Encoded *bool   `` /* 131-byte string literal not displayed */
	// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
	OrderingKey          []byte   `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

nolint

func (*SingleMessageMetadata) GetPayloadSize

func (m *SingleMessageMetadata) GetPayloadSize() int32

func (*SingleMessageMetadata) ProtoMessage

func (*SingleMessageMetadata) ProtoMessage()

func (*SingleMessageMetadata) Reset

func (m *SingleMessageMetadata) Reset()

func (*SingleMessageMetadata) String

func (m *SingleMessageMetadata) String() string

type SinkConfig

type SinkConfig struct {
	TopicsPattern *string    `json:"topicsPattern" yaml:"topicsPattern"`
	Resources     *Resources `json:"resources" yaml:"resources"`
	TimeoutMs     *int64     `json:"timeoutMs" yaml:"timeoutMs"`

	// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
	CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`

	RetainOrdering bool   `json:"retainOrdering" yaml:"retainOrdering"`
	AutoAck        bool   `json:"autoAck" yaml:"autoAck"`
	Parallelism    int    `json:"parallelism" yaml:"parallelism"`
	Tenant         string `json:"tenant" yaml:"tenant"`
	Namespace      string `json:"namespace" yaml:"namespace"`
	Name           string `json:"name" yaml:"name"`
	ClassName      string `json:"className" yaml:"className"`

	Archive                string                    `json:"archive" yaml:"archive"`
	ProcessingGuarantees   string                    `json:"processingGuarantees" yaml:"processingGuarantees"`
	SourceSubscriptionName string                    `json:"sourceSubscriptionName" yaml:"sourceSubscriptionName"`
	RuntimeFlags           string                    `json:"runtimeFlags" yaml:"runtimeFlags"`
	Inputs                 []string                  `json:"inputs" yaml:"inputs"`
	TopicToSerdeClassName  map[string]string         `json:"topicToSerdeClassName" yaml:"topicToSerdeClassName"`
	TopicToSchemaType      map[string]string         `json:"topicToSchemaType" yaml:"topicToSchemaType"`
	InputSpecs             map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"`
	Configs                map[string]interface{}    `json:"configs" yaml:"configs"`

	// This is a map of secretName(aka how the secret is going to be
	// accessed in the function via context) to an object that
	// encapsulates how the secret is fetched by the underlying
	// secrets provider. The type of an value here can be found by the
	// SecretProviderConfigurator.getSecretObjectType() method.
	Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`
}

type SinkData

type SinkData struct {
	UpdateAuthData          bool        `json:"updateAuthData"`
	RetainOrdering          bool        `json:"retainOrdering"`
	AutoAck                 bool        `json:"autoAck"`
	Parallelism             int         `json:"parallelism"`
	RAM                     int64       `json:"ram"`
	Disk                    int64       `json:"disk"`
	TimeoutMs               int64       `json:"timeoutMs"`
	CPU                     float64     `json:"cpu"`
	Tenant                  string      `json:"tenant"`
	Namespace               string      `json:"namespace"`
	Name                    string      `json:"name"`
	SinkType                string      `json:"sinkType"`
	Inputs                  string      `json:"inputs"`
	TopicsPattern           string      `json:"topicsPattern"`
	SubsName                string      `json:"subsName"`
	CustomSerdeInputString  string      `json:"customSerdeInputString"`
	CustomSchemaInputString string      `json:"customSchemaInputString"`
	ProcessingGuarantees    string      `json:"processingGuarantees"`
	Archive                 string      `json:"archive"`
	ClassName               string      `json:"className"`
	SinkConfigFile          string      `json:"sinkConfigFile"`
	SinkConfigString        string      `json:"sinkConfigString"`
	InstanceID              string      `json:"instanceId"`
	SinkConf                *SinkConfig `json:"-"`
}

type SinkInstanceStatus

type SinkInstanceStatus struct {
	InstanceID int                      `json:"instanceId"`
	Status     SourceInstanceStatusData `json:"status"`
}

type SinkInstanceStatusData

type SinkInstanceStatusData struct {
	// Is this instance running?
	Running bool `json:"running"`

	// Do we have any error while running this instance
	Err string `json:"error"`

	// Number of times this instance has restarted
	NumRestarts int64 `json:"numRestarts"`

	// Number of messages read from Pulsar
	NumReadFromPulsar int64 `json:"numReadFromPulsar"`

	// Number of times there was a system exception handling messages
	NumSystemExceptions int64 `json:"numSystemExceptions"`

	// A list of the most recent system exceptions
	LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`

	// Number of times there was a sink exception
	NumSinkExceptions int64 `json:"numSinkExceptions"`

	// A list of the most recent sink exceptions
	LatestSinkExceptions []ExceptionInformation `json:"latestSinkExceptions"`

	// Number of messages written to sink
	NumWrittenToSink int64 `json:"numWrittenToSink"`

	// When was the last time we received a Message from Pulsar
	LastReceivedTime int64 `json:"lastReceivedTime"`

	WorkerID string `json:"workerId"`
}

type SinkStatus

type SinkStatus struct {
	// The total number of sink instances that ought to be running
	NumInstances int `json:"numInstances"`

	// The number of source instances that are actually running
	NumRunning int `json:"numRunning"`

	Instances []*SinkInstanceStatus `json:"instances"`
}

type Sinks

type Sinks interface {
	// ListSinks returns the list of all the Pulsar Sinks.
	ListSinks(tenant, namespace string) ([]string, error)

	// GetSink returns the configuration for the specified sink
	GetSink(tenant, namespace, Sink string) (SinkConfig, error)

	// CreateSink creates a new sink
	CreateSink(config *SinkConfig, fileName string) error

	// CreateSinkWithURL creates a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
	CreateSinkWithURL(config *SinkConfig, pkgURL string) error

	// UpdateSink updates the configuration for a sink.
	UpdateSink(config *SinkConfig, fileName string, options *UpdateOptions) error

	// UpdateSinkWithURL updates a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
	UpdateSinkWithURL(config *SinkConfig, pkgURL string, options *UpdateOptions) error

	// DeleteSink deletes an existing sink
	DeleteSink(tenant, namespace, Sink string) error

	// GetSinkStatus returns the current status of a sink.
	GetSinkStatus(tenant, namespace, Sink string) (SinkStatus, error)

	// GetSinkStatusWithID returns the current status of a sink instance.
	GetSinkStatusWithID(tenant, namespace, Sink string, id int) (SinkInstanceStatusData, error)

	// RestartSink restarts all sink instances
	RestartSink(tenant, namespace, Sink string) error

	// RestartSinkWithID restarts sink instance
	RestartSinkWithID(tenant, namespace, Sink string, id int) error

	// StopSink stops all sink instances
	StopSink(tenant, namespace, Sink string) error

	// StopSinkWithID stops sink instance
	StopSinkWithID(tenant, namespace, Sink string, id int) error

	// StartSink starts all sink instances
	StartSink(tenant, namespace, Sink string) error

	// StartSinkWithID starts sink instance
	StartSinkWithID(tenant, namespace, Sink string, id int) error

	// GetBuiltInSinks fetches a list of supported Pulsar IO sinks currently running in cluster mode
	GetBuiltInSinks() ([]*ConnectorDefinition, error)

	// ReloadBuiltInSinks reload the available built-in connectors, include Source and Sink
	ReloadBuiltInSinks() error
}

Sinks is admin interface for sinks management

type SourceConfig

type SourceConfig struct {
	Tenant    string `json:"tenant" yaml:"tenant"`
	Namespace string `json:"namespace" yaml:"namespace"`
	Name      string `json:"name" yaml:"name"`
	ClassName string `json:"className" yaml:"className"`

	TopicName      string `json:"topicName" yaml:"topicName"`
	SerdeClassName string `json:"serdeClassName" yaml:"serdeClassName"`
	SchemaType     string `json:"schemaType" yaml:"schemaType"`

	Configs map[string]interface{} `json:"configs" yaml:"configs"`

	// This is a map of secretName(aka how the secret is going to be
	// accessed in the function via context) to an object that
	// encapsulates how the secret is fetched by the underlying
	// secrets provider. The type of an value here can be found by the
	// SecretProviderConfigurator.getSecretObjectType() method.
	Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`

	Parallelism          int        `json:"parallelism" yaml:"parallelism"`
	ProcessingGuarantees string     `json:"processingGuarantees" yaml:"processingGuarantees"`
	Resources            *Resources `json:"resources" yaml:"resources"`
	Archive              string     `json:"archive" yaml:"archive"`
	// Any flags that you want to pass to the runtime.
	RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`
}

type SourceData

type SourceData struct {
	Tenant                   string  `json:"tenant"`
	Namespace                string  `json:"namespace"`
	Name                     string  `json:"name"`
	SourceType               string  `json:"sourceType"`
	ProcessingGuarantees     string  `json:"processingGuarantees"`
	DestinationTopicName     string  `json:"destinationTopicName"`
	DeserializationClassName string  `json:"deserializationClassName"`
	SchemaType               string  `json:"schemaType"`
	Parallelism              int     `json:"parallelism"`
	Archive                  string  `json:"archive"`
	ClassName                string  `json:"className"`
	SourceConfigFile         string  `json:"sourceConfigFile"`
	CPU                      float64 `json:"cpu"`
	RAM                      int64   `json:"ram"`
	Disk                     int64   `json:"disk"`
	SourceConfigString       string  `json:"sourceConfigString"`

	SourceConf *SourceConfig `json:"-"`
	InstanceID string        `json:"instanceId"`

	UpdateAuthData bool `json:"updateAuthData"`
}

type SourceInstanceStatus

type SourceInstanceStatus struct {
	InstanceID int                      `json:"instanceId"`
	Status     SourceInstanceStatusData `json:"status"`
}

type SourceInstanceStatusData

type SourceInstanceStatusData struct {
	Running                bool                   `json:"running"`
	Err                    string                 `json:"error"`
	NumRestarts            int64                  `json:"numRestarts"`
	NumReceivedFromSource  int64                  `json:"numReceivedFromSource"`
	NumSystemExceptions    int64                  `json:"numSystemExceptions"`
	LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`
	NumSourceExceptions    int64                  `json:"numSourceExceptions"`
	LatestSourceExceptions []ExceptionInformation `json:"latestSourceExceptions"`
	NumWritten             int64                  `json:"numWritten"`
	LastReceivedTime       int64                  `json:"lastReceivedTime"`
	WorkerID               string                 `json:"workerId"`
}

type SourceStatus

type SourceStatus struct {
	NumInstances int                     `json:"numInstances"`
	NumRunning   int                     `json:"numRunning"`
	Instances    []*SourceInstanceStatus `json:"instances"`
}

type Sources

type Sources interface {
	// ListSources returns the list of all the Pulsar Sources.
	ListSources(tenant, namespace string) ([]string, error)

	// GetSource return the configuration for the specified source
	GetSource(tenant, namespace, source string) (SourceConfig, error)

	// CreateSource creates a new source
	CreateSource(config *SourceConfig, fileName string) error

	// CreateSourceWithURL creates a new source by providing url from which fun-pkg can be downloaded.
	// supported url: http/file
	CreateSourceWithURL(config *SourceConfig, pkgURL string) error

	// UpdateSource updates the configuration for a source.
	UpdateSource(config *SourceConfig, fileName string, options *UpdateOptions) error

	// UpdateSourceWithURL updates a source by providing url from which fun-pkg can be downloaded. supported url: http/file
	UpdateSourceWithURL(config *SourceConfig, pkgURL string, options *UpdateOptions) error

	// DeleteSource deletes an existing source
	DeleteSource(tenant, namespace, source string) error

	// GetSourceStatus returns the current status of a source.
	GetSourceStatus(tenant, namespace, source string) (SourceStatus, error)

	// GetSourceStatusWithID returns the current status of a source instance.
	GetSourceStatusWithID(tenant, namespace, source string, id int) (SourceInstanceStatusData, error)

	// RestartSource restarts all source instances
	RestartSource(tenant, namespace, source string) error

	// RestartSourceWithID restarts source instance
	RestartSourceWithID(tenant, namespace, source string, id int) error

	// StopSource stops all source instances
	StopSource(tenant, namespace, source string) error

	// StopSourceWithID stops source instance
	StopSourceWithID(tenant, namespace, source string, id int) error

	// StartSource starts all source instances
	StartSource(tenant, namespace, source string) error

	// StartSourceWithID starts source instance
	StartSourceWithID(tenant, namespace, source string, id int) error

	// GetBuiltInSources fetches a list of supported Pulsar IO sources currently running in cluster mode
	GetBuiltInSources() ([]*ConnectorDefinition, error)

	// ReloadBuiltInSources reloads the available built-in connectors, include Source and Sink
	ReloadBuiltInSources() error
}

Sources is admin interface for sources management

type Status

type Status string
const (
	NOTRUN  Status = "NOT_RUN"
	RUNNING Status = "RUNNING"
	SUCCESS Status = "SUCCESS"
	ERROR   Status = "ERROR"
)

func (Status) String

func (s Status) String() string

type SubscribeRate

type SubscribeRate struct {
	SubscribeThrottlingRatePerConsumer int `json:"subscribeThrottlingRatePerConsumer"`
	RatePeriodInSecond                 int `json:"ratePeriodInSecond"`
}

func NewSubscribeRate

func NewSubscribeRate() *SubscribeRate

type SubscriptionAuthMode

type SubscriptionAuthMode string
const (
	None   SubscriptionAuthMode = "None"
	Prefix SubscriptionAuthMode = "Prefix"
)

func ParseSubscriptionAuthMode

func ParseSubscriptionAuthMode(s string) (SubscriptionAuthMode, error)

func (SubscriptionAuthMode) String

func (s SubscriptionAuthMode) String() string

type SubscriptionStats

type SubscriptionStats struct {
	BlockedSubscriptionOnUnackedMsgs bool            `json:"blockedSubscriptionOnUnackedMsgs"`
	IsReplicated                     bool            `json:"isReplicated"`
	MsgRateOut                       float64         `json:"msgRateOut"`
	MsgThroughputOut                 float64         `json:"msgThroughputOut"`
	MsgRateRedeliver                 float64         `json:"msgRateRedeliver"`
	MsgRateExpired                   float64         `json:"msgRateExpired"`
	MsgBacklog                       int64           `json:"msgBacklog"`
	MsgDelayed                       int64           `json:"msgDelayed"`
	UnAckedMessages                  int64           `json:"unackedMessages"`
	SubType                          string          `json:"type"`
	ActiveConsumerName               string          `json:"activeConsumerName"`
	Consumers                        []ConsumerStats `json:"consumers"`
}

type Subscriptions

type Subscriptions interface {
	// Create a new subscription on a topic
	Create(TopicName, string, MessageID) error

	// Delete a subscription.
	// Delete a persistent subscription from a topic. There should not be any active consumers on the subscription
	Delete(TopicName, string) error

	// List returns the list of subscriptions
	List(TopicName) ([]string, error)

	// ResetCursorToMessageID resets cursor position on a topic subscription
	// @param
	// messageID reset subscription to messageId (or previous nearest messageId if given messageId is not valid)
	ResetCursorToMessageID(TopicName, string, MessageID) error

	// ResetCursorToTimestamp resets cursor position on a topic subscription
	// @param
	// time reset subscription to position closest to time in ms since epoch
	ResetCursorToTimestamp(TopicName, string, int64) error

	// ClearBacklog skips all messages on a topic subscription
	ClearBacklog(TopicName, string) error

	// SkipMessages skips messages on a topic subscription
	SkipMessages(TopicName, string, int64) error

	// ExpireMessages expires all messages older than given N (expireTimeInSeconds) seconds for a given subscription
	ExpireMessages(TopicName, string, int64) error

	// ExpireAllMessages expires all messages older than given N (expireTimeInSeconds) seconds for all
	// subscriptions of the persistent-topic
	ExpireAllMessages(TopicName, int64) error

	// PeekMessages peeks messages from a topic subscription
	PeekMessages(TopicName, string, int) ([]*Message, error)
}

Subscriptions is admin interface for subscriptions management

type TLSOptions

type TLSOptions struct {
	TrustCertsFilePath      string
	AllowInsecureConnection bool
}

type TenantData

type TenantData struct {
	Name            string   `json:"-"`
	AdminRoles      []string `json:"adminRoles"`
	AllowedClusters []string `json:"allowedClusters"`
}

Tenant args

type Tenants

type Tenants interface {
	// Create a new tenant
	Create(TenantData) error

	// Delete an existing tenant
	Delete(string) error

	// Update the admins for a tenant
	Update(TenantData) error

	//List returns the list of tenants
	List() ([]string, error)

	// Get returns the config of the tenant.
	Get(string) (TenantData, error)
}

Tenants is admin interface for tenants management

type TopicDomain

type TopicDomain string

func ParseTopicDomain

func ParseTopicDomain(domain string) (TopicDomain, error)

func (TopicDomain) String

func (t TopicDomain) String() string

type TopicName

type TopicName struct {
	// contains filtered or unexported fields
}

func GetTopicName

func GetTopicName(completeName string) (*TopicName, error)

The topic name can be in two different forms, one is fully qualified topic name, the other one is short topic name

func (*TopicName) GetDomain

func (t *TopicName) GetDomain() TopicDomain

func (*TopicName) GetEncodedTopic

func (t *TopicName) GetEncodedTopic() string

func (*TopicName) GetLocalName

func (t *TopicName) GetLocalName() string

func (*TopicName) GetPartition

func (t *TopicName) GetPartition(index int) (*TopicName, error)

func (*TopicName) GetRestPath

func (t *TopicName) GetRestPath() string

func (*TopicName) IsPersistent

func (t *TopicName) IsPersistent() bool

func (*TopicName) String

func (t *TopicName) String() string

type TopicStats

type TopicStats struct {
	MsgRateIn           float64                      `json:"msgRateIn"`
	MsgRateOut          float64                      `json:"msgRateOut"`
	MsgThroughputIn     float64                      `json:"msgThroughputIn"`
	MsgThroughputOut    float64                      `json:"msgThroughputOut"`
	AverageMsgSize      float64                      `json:"averageMsgSize"`
	StorageSize         int64                        `json:"storageSize"`
	Publishers          []PublisherStats             `json:"publishers"`
	Subscriptions       map[string]SubscriptionStats `json:"subscriptions"`
	Replication         map[string]ReplicatorStats   `json:"replication"`
	DeDuplicationStatus string                       `json:"deduplicationStatus"`
}

type TopicStatsStream

type TopicStatsStream struct {
	TopicsMap map[string]map[string]map[string]TopicStats `json:"topicStatsBuf"`
}

type Topics

type Topics interface {
	// Create a topic
	Create(TopicName, int) error

	// Delete a topic
	Delete(TopicName, bool, bool) error

	// Update number of partitions of a non-global partitioned topic
	// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
	// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
	Update(TopicName, int) error

	// GetMetadata returns metadata of a partitioned topic
	GetMetadata(TopicName) (PartitionedTopicMetadata, error)

	// List returns the list of topics under a namespace
	List(NameSpaceName) ([]string, []string, error)

	// GetInternalInfo returns the internal metadata info for the topic
	GetInternalInfo(TopicName) (ManagedLedgerInfo, error)

	// GetPermissions returns permissions on a topic
	// Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the
	// namespace level combined (union) with any eventual specific permission set on the topic.
	GetPermissions(TopicName) (map[string][]AuthAction, error)

	// GrantPermission grants a new permission to a client role on a single topic
	GrantPermission(TopicName, string, []AuthAction) error

	// RevokePermission revokes permissions to a client role on a single topic. If the permission
	// was not set at the topic level, but rather at the namespace level, this operation will
	// return an error (HTTP status code 412).
	RevokePermission(TopicName, string) error

	// Lookup a topic returns the broker URL that serves the topic
	Lookup(TopicName) (LookupData, error)

	// GetBundleRange returns a bundle range of a topic
	GetBundleRange(TopicName) (string, error)

	// GetLastMessageID returns the last commit message Id of a topic
	GetLastMessageID(TopicName) (MessageID, error)

	// GetStats returns the stats for the topic
	// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
	GetStats(TopicName) (TopicStats, error)

	// GetInternalStats returns the internal stats for the topic.
	GetInternalStats(TopicName) (PersistentTopicInternalStats, error)

	// GetPartitionedStats returns the stats for the partitioned topic
	// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
	GetPartitionedStats(TopicName, bool) (PartitionedTopicStats, error)

	// Terminate the topic and prevent any more messages being published on it
	Terminate(TopicName) (MessageID, error)

	// Offload triggers offloading messages in topic to longterm storage
	Offload(TopicName, MessageID) error

	// OffloadStatus checks the status of an ongoing offloading operation for a topic
	OffloadStatus(TopicName) (OffloadProcessStatus, error)

	// Unload a topic
	Unload(TopicName) error

	// Compact triggers compaction to run for a topic. A single topic can only have one instance of compaction
	// running at any time. Any attempt to trigger another will be met with a ConflictException.
	Compact(TopicName) error

	// CompactStatus checks the status of an ongoing compaction for a topic
	CompactStatus(TopicName) (LongRunningProcessStatus, error)
}

Topics is admin interface for topics management

type UpdateOptions

type UpdateOptions struct {
	UpdateAuthData bool
}

Options while updating the sink

func NewUpdateOptions

func NewUpdateOptions() *UpdateOptions

type WindowConfig

type WindowConfig struct {
	WindowLengthCount             int
	WindowLengthDurationMs        int64
	SlidingIntervalCount          int
	SlidingIntervalDurationMs     int64
	LateDataTopic                 string
	MaxLagMs                      int64
	WatermarkEmitIntervalMs       int64
	TimestampExtractorClassName   string
	ActualWindowFunctionClassName string
}

func NewDefaultWindowConfing

func NewDefaultWindowConfing() *WindowConfig

type WorkerFunctionInstanceStats

type WorkerFunctionInstanceStats struct {
	Name    string                    `json:"name"`
	Metrics FunctionInstanceStatsData `json:"metrics"`
}

type WorkerInfo

type WorkerInfo struct {
	WorkerID       string `json:"workerId"`
	WorkerHostname string `json:"workerHostname"`
	Port           int    `json:"port"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL