Documentation ¶
Index ¶
- Constants
- Variables
- func IsAdminError(err error) bool
- type APIVersion
- type AllocatorStats
- type AuthAction
- type AuthPolicies
- type AutoFailoverPolicyData
- type AutoFailoverPolicyType
- type BacklogQuota
- type BacklogQuotaType
- type BookieAffinityGroupData
- type BrokerAssignment
- type BrokerData
- type BrokerNamespaceIsolationData
- type BrokerStats
- type BrokerStatsData
- type Brokers
- type BundlesData
- type Client
- type ClusterData
- type Clusters
- type Config
- type ConnectorDefinition
- type ConsumerConfig
- type ConsumerStats
- type CursorInfo
- type CursorStats
- type DispatchRate
- type Error
- type Example
- type ExceptionInformation
- type FailureDomainData
- type FailureDomainMap
- type FunctionConfig
- type FunctionData
- type FunctionInstanceStats
- type FunctionInstanceStatsData
- type FunctionInstanceStatsDataBase
- type FunctionInstanceStatus
- type FunctionInstanceStatusData
- type FunctionState
- type FunctionStats
- type FunctionStatus
- type Functions
- type FunctionsWorker
- type GetSchemaResponse
- type InternalConfigurationData
- type KeyValue
- type LedgerInfo
- type LocalBrokerData
- type LongDescription
- type LongRunningProcessStatus
- type LookupData
- type ManagedLedgerInfo
- type Message
- type MessageID
- type MessageRangeInfo
- type Metrics
- type NameSpaceName
- type NamespaceBundleStats
- type NamespaceIsolationData
- type NamespaceOwnershipStatus
- type Namespaces
- type NamespacesData
- type NsIsolationPoliciesData
- type NsIsolationPolicy
- type OffloadProcessStatus
- type Output
- type PartitionedTopicMetadata
- type PartitionedTopicStats
- type PersistencePolicies
- type PersistentTopicInternalStats
- type Policies
- type PoolArenaStats
- type PoolChunkListStats
- type PoolChunkStats
- type PoolSubpageStats
- type PositionInfo
- type PostSchemaPayload
- type PublisherStats
- type ReplicatorStats
- type ResourceQuota
- type ResourceQuotaData
- type ResourceQuotas
- type ResourceUsage
- type Resources
- type RetentionPolicies
- type RetentionPolicy
- type Schema
- type SchemaCompatibilityStrategy
- type SchemaData
- type SchemaInfo
- type SchemaInfoWithVersion
- type SingleMessageMetadata
- type SinkConfig
- type SinkData
- type SinkInstanceStatus
- type SinkInstanceStatusData
- type SinkStatus
- type Sinks
- type SourceConfig
- type SourceData
- type SourceInstanceStatus
- type SourceInstanceStatusData
- type SourceStatus
- type Sources
- type Status
- type SubscribeRate
- type SubscriptionAuthMode
- type SubscriptionStats
- type Subscriptions
- type TLSOptions
- type TenantData
- type Tenants
- type TopicDomain
- type TopicName
- func (t *TopicName) GetDomain() TopicDomain
- func (t *TopicName) GetEncodedTopic() string
- func (t *TopicName) GetLocalName() string
- func (t *TopicName) GetPartition(index int) (*TopicName, error)
- func (t *TopicName) GetRestPath() string
- func (t *TopicName) IsPersistent() bool
- func (t *TopicName) String() string
- type TopicStats
- type TopicStatsStream
- type Topics
- type UpdateOptions
- type WindowConfig
- type WorkerFunctionInstanceStats
- type WorkerInfo
Constants ¶
const ( DefaultWebServiceURL = "http://localhost:8080" DefaultHTTPTimeOutDuration = 5 * time.Minute )
const ( JavaRuntime = "JAVA" PythonRuntime = "PYTHON" GoRuntime = "GO" )
const ( FirstBoundary string = "0x00000000" LastBoundary string = "0xffffffff" )
const ( PublishTimeHeader = "X-Pulsar-Publish-Time" BatchHeader = "X-Pulsar-Num-Batch-Message" PropertyPrefix = "X-Pulsar-PROPERTY-" )
const ( PUBLICTENANT = "public" DEFAULTNAMESPACE = "default" PARTITIONEDTOPICSUFFIX = "-partition-" )
const DefaultAPIVersion = "v2"
const PATTEN = "^[-=:.\\w]*$"
allowed characters for property, namespace, cluster and topic names are alphanumeric (a-zA-Z0-9) and these special chars -=:. and % is allowed as part of valid URL encoding
const WindowConfigKey = "__WINDOWCONFIGS__"
Variables ¶
var EXAMPLES = "EXAMPLES:"
var Earliest = MessageID{-1, -1, -1, -1}
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var OUTPUT = "OUTPUT:"
var PERMISSION = "REQUIRED PERMISSION:"
var ReleaseVersion = "None"
var SCOPE = "SCOPE:"
var SPACES = " "
var USEDFOR = "USED FOR:"
Functions ¶
func IsAdminError ¶
Types ¶
type APIVersion ¶
type APIVersion int
const ( V1 APIVersion = iota V2 V3 )
func (APIVersion) String ¶
func (v APIVersion) String() string
type AllocatorStats ¶
type AllocatorStats struct { NumDirectArenas int `json:"numDirectArenas"` NumHeapArenas int `json:"numHeapArenas"` NumThreadLocalCaches int `json:"numThreadLocalCaches"` NormalCacheSize int `json:"normalCacheSize"` SmallCacheSize int `json:"smallCacheSize"` TinyCacheSize int `json:"tinyCacheSize"` DirectArenas []PoolArenaStats `json:"directArenas"` HeapArenas []PoolArenaStats `json:"heapArenas"` }
type AuthAction ¶
type AuthAction string
func ParseAuthAction ¶
func ParseAuthAction(action string) (AuthAction, error)
func (AuthAction) String ¶
func (a AuthAction) String() string
type AuthPolicies ¶
type AuthPolicies struct { NamespaceAuth map[string]AuthAction `json:"namespace_auth"` DestinationAuth map[string]map[string]AuthAction `json:"destination_auth"` SubscriptionAuthRoles map[string][]string `json:"subscription_auth_roles"` }
func NewAuthPolicies ¶
func NewAuthPolicies() *AuthPolicies
type AutoFailoverPolicyData ¶
type AutoFailoverPolicyData struct { PolicyType AutoFailoverPolicyType `json:"policy_type"` Parameters map[string]string `json:"parameters"` }
type AutoFailoverPolicyType ¶
type AutoFailoverPolicyType string
const (
MinAvailable AutoFailoverPolicyType = "min_available"
)
type BacklogQuota ¶
type BacklogQuota struct { Limit int64 `json:"limit"` Policy RetentionPolicy `json:"policy"` }
func NewBacklogQuota ¶
func NewBacklogQuota(limit int64, policy RetentionPolicy) BacklogQuota
type BacklogQuotaType ¶
type BacklogQuotaType string
const DestinationStorage BacklogQuotaType = "destination_storage"
type BookieAffinityGroupData ¶
type BrokerAssignment ¶
type BrokerAssignment string
const ( Primary BrokerAssignment = "primary" Secondary BrokerAssignment = "secondary" )
type BrokerData ¶
type BrokerStats ¶
type BrokerStats interface { // GetMetrics returns Monitoring metrics GetMetrics() ([]Metrics, error) // GetMBeans requests JSON string server mbean dump GetMBeans() ([]Metrics, error) // GetTopics returns JSON string topics stats GetTopics() (string, error) // GetLoadReport returns load report of broker GetLoadReport() (*LocalBrokerData, error) // GetAllocatorStats returns stats from broker GetAllocatorStats(allocatorName string) (*AllocatorStats, error) }
BrokerStats is admin interface for broker stats management
type BrokerStatsData ¶
type BrokerStatsData struct {
Indent bool `json:"indent"`
}
type Brokers ¶
type Brokers interface { // GetActiveBrokers returns the list of active brokers in the cluster. GetActiveBrokers(cluster string) ([]string, error) // GetDynamicConfigurationNames returns list of updatable configuration name GetDynamicConfigurationNames() ([]string, error) // GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error) // UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on // brokers and all brokers can update {@link ServiceConfiguration} value locally UpdateDynamicConfiguration(configName, configValue string) error // DeleteDynamicConfiguration deletes dynamic configuration value in to Zk. It will not impact current value // in broker but next time when broker restarts, it applies value from configuration file only. DeleteDynamicConfiguration(configName string) error // GetRuntimeConfigurations returns values of runtime configuration GetRuntimeConfigurations() (map[string]string, error) // GetInternalConfigurationData returns the internal configuration data GetInternalConfigurationData() (*InternalConfigurationData, error) // GetAllDynamicConfigurations returns values of all overridden dynamic-configs GetAllDynamicConfigurations() (map[string]string, error) // HealthCheck run a health check on the broker HealthCheck() error }
Brokers is admin interface for brokers management
type BundlesData ¶
type BundlesData struct { Boundaries []string `json:"boundaries"` NumBundles int `json:"numBundles"` }
func NewBundlesData ¶
func NewBundlesData(boundaries []string) BundlesData
func NewBundlesDataWithNumBundles ¶
func NewBundlesDataWithNumBundles(numBundles int) *BundlesData
func NewDefaultBoundle ¶
func NewDefaultBoundle() *BundlesData
type Client ¶
type Client interface { Clusters() Clusters Functions() Functions Tenants() Tenants Topics() Topics Subscriptions() Subscriptions Sources() Sources Sinks() Sinks Namespaces() Namespaces Schemas() Schema NsIsolationPolicy() NsIsolationPolicy Brokers() Brokers BrokerStats() BrokerStats ResourceQuotas() ResourceQuotas FunctionsWorker() FunctionsWorker }
Client provides a client to the Pulsar Restful API
type ClusterData ¶
type ClusterData struct { Name string `json:"-"` ServiceURL string `json:"serviceUrl"` ServiceURLTls string `json:"serviceUrlTls"` BrokerServiceURL string `json:"brokerServiceUrl"` BrokerServiceURLTls string `json:"brokerServiceUrlTls"` PeerClusterNames []string `json:"peerClusterNames"` }
ClusterData information on a cluster
type Clusters ¶
type Clusters interface { // List returns the list of clusters List() ([]string, error) // Get the configuration data for the specified cluster Get(string) (ClusterData, error) // Create a new cluster Create(ClusterData) error // Delete an existing cluster Delete(string) error // Update the configuration for a cluster Update(ClusterData) error // UpdatePeerClusters updates peer cluster names. UpdatePeerClusters(string, []string) error // GetPeerClusters returns peer-cluster names GetPeerClusters(string) ([]string, error) // CreateFailureDomain creates a domain into cluster CreateFailureDomain(FailureDomainData) error // GetFailureDomain returns the domain registered into a cluster GetFailureDomain(clusterName, domainName string) (FailureDomainData, error) // ListFailureDomains returns all registered domains in cluster ListFailureDomains(string) (FailureDomainMap, error) // DeleteFailureDomain deletes a domain in cluster DeleteFailureDomain(FailureDomainData) error // UpdateFailureDomain updates a domain into cluster UpdateFailureDomain(FailureDomainData) error }
Clusters is admin interface for clusters management
type Config ¶
type Config struct { WebServiceURL string HTTPTimeout time.Duration HTTPClient *http.Client APIVersion APIVersion Auth *auth.TLSAuthProvider AuthParams string TLSOptions *TLSOptions TokenAuth *auth.TokenAuthProvider }
Config is used to configure the admin client
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a default configuration for the pulsar admin client
type ConnectorDefinition ¶
type ConnectorDefinition struct { // The name of the connector type Name string `json:"name"` // Description to be used for user help Description string `json:"description"` // The class name for the connector source implementation // <p>If not defined, it will be assumed this connector cannot act as a data source SourceClass string `json:"sourceClass"` // The class name for the connector sink implementation // <p>If not defined, it will be assumed this connector cannot act as a data sink SinkClass string `json:"sinkClass"` }
Basic information about a Pulsar connector
type ConsumerConfig ¶
type ConsumerStats ¶
type ConsumerStats struct { BlockedConsumerOnUnAckedMsgs bool `json:"blockedConsumerOnUnackedMsgs"` AvailablePermits int `json:"availablePermits"` UnAckedMessages int `json:"unackedMessages"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateRedeliver float64 `json:"msgRateRedeliver"` ConsumerName string `json:"consumerName"` Metadata map[string]string `json:"metadata"` }
type CursorInfo ¶
type CursorInfo struct { Version int `json:"version"` CreationDate string `json:"creationDate"` ModificationDate string `json:"modificationDate"` CursorsLedgerID int64 `json:"cursorsLedgerId"` MarkDelete PositionInfo `json:"markDelete"` IndividualDeletedMessages []MessageRangeInfo `json:"individualDeletedMessages"` Properties map[string]int64 }
type CursorStats ¶
type CursorStats struct { MarkDeletePosition string `json:"markDeletePosition"` ReadPosition string `json:"readPosition"` WaitingReadOp bool `json:"waitingReadOp"` PendingReadOps int `json:"pendingReadOps"` MessagesConsumedCounter int64 `json:"messagesConsumedCounter"` CursorLedger int64 `json:"cursorLedger"` CursorLedgerLastEntry int64 `json:"cursorLedgerLastEntry"` IndividuallyDeletedMessages string `json:"individuallyDeletedMessages"` LastLedgerWitchTimestamp string `json:"lastLedgerWitchTimestamp"` State string `json:"state"` NumberOfEntriesSinceFirstNotAckedMessage int64 `json:"numberOfEntriesSinceFirstNotAckedMessage"` TotalNonContiguousDeletedMessagesRange int `json:"totalNonContiguousDeletedMessagesRange"` Properties map[string]int64 `json:"properties"` }
type DispatchRate ¶
type DispatchRate struct { DispatchThrottlingRateInMsg int `json:"dispatchThrottlingRateInMsg"` DispatchThrottlingRateInByte int64 `json:"dispatchThrottlingRateInByte"` RatePeriodInSecond int `json:"ratePeriodInSecond"` }
func NewDispatchRate ¶
func NewDispatchRate() *DispatchRate
type ExceptionInformation ¶
type FailureDomainData ¶
type FailureDomainData struct { ClusterName string `json:"-"` DomainName string `json:"-"` BrokerList []string `json:"brokers"` }
Failure Domain information
type FailureDomainMap ¶
type FailureDomainMap map[string]FailureDomainData
type FunctionConfig ¶
type FunctionConfig struct { TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"` // Whether the subscriptions the functions created/used should be deleted when the functions is deleted CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"` RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"` AutoAck bool `json:"autoAck" yaml:"autoAck"` Parallelism int `json:"parallelism" yaml:"parallelism"` MaxMessageRetries int `json:"maxMessageRetries" yaml:"maxMessageRetries"` Output string `json:"output" yaml:"output"` OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"` LogTopic string `json:"logTopic" yaml:"logTopic"` ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"` // Represents either a builtin schema type (eg: 'avro', 'json', etc) or the class name for a Schema implementation OutputSchemaType string `json:"outputSchemaType" yaml:"outputSchemaType"` Runtime string `json:"runtime" yaml:"runtime"` DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"` SubName string `json:"subName" yaml:"subName"` FQFN string `json:"fqfn" yaml:"fqfn"` Jar string `json:"jar" yaml:"jar"` Py string `json:"py" yaml:"py"` Go string `json:"go" yaml:"go"` // Any flags that you want to pass to the runtime. // note that in thread mode, these flags will have no impact RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"` Tenant string `json:"tenant" yaml:"tenant"` Namespace string `json:"namespace" yaml:"namespace"` Name string `json:"name" yaml:"name"` ClassName string `json:"className" yaml:"className"` Resources *Resources `json:"resources" yaml:"resources"` WindowConfig *WindowConfig `json:"windowConfig" yaml:"windowConfig"` Inputs []string `json:"inputs" yaml:"inputs"` UserConfig map[string]interface{} `json:"userConfig" yaml:"userConfig"` CustomSerdeInputs map[string]string `json:"customSerdeInputs" yaml:"customSerdeInputs"` CustomSchemaInputs map[string]string `json:"customSchemaInputs" yaml:"customSchemaInputs"` // A generalized way of specifying inputs InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"` // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that // encapsulates how the secret is fetched by the underlying // secrets provider. The type of an value here can be found by the // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} `json:"secrets" yaml:"secrets"` }
type FunctionData ¶
type FunctionData struct { UpdateAuthData bool `json:"updateAuthData"` RetainOrdering bool `json:"retainOrdering"` Watch bool `json:"watch"` AutoAck bool `json:"autoAck"` Parallelism int `json:"parallelism"` WindowLengthCount int `json:"windowLengthCount"` SlidingIntervalCount int `json:"slidingIntervalCount"` MaxMessageRetries int `json:"maxMessageRetries"` TimeoutMs int64 `json:"timeoutMs"` SlidingIntervalDurationMs int64 `json:"slidingIntervalDurationMs"` WindowLengthDurationMs int64 `json:"windowLengthDurationMs"` RAM int64 `json:"ram"` Disk int64 `json:"disk"` CPU float64 `json:"cpu"` SubsName string `json:"subsName"` DeadLetterTopic string `json:"deadLetterTopic"` Key string `json:"key"` State string `json:"state"` TriggerValue string `json:"triggerValue"` TriggerFile string `json:"triggerFile"` Topic string `json:"topic"` UserCodeFile string `json:"-"` FQFN string `json:"fqfn"` Tenant string `json:"tenant"` Namespace string `json:"namespace"` FuncName string `json:"functionName"` InstanceID string `json:"instance_id"` ClassName string `json:"className"` Jar string `json:"jarFile"` Py string `json:"pyFile"` Go string `json:"goFile"` Inputs string `json:"inputs"` TopicsPattern string `json:"topicsPattern"` Output string `json:"output"` LogTopic string `json:"logTopic"` SchemaType string `json:"schemaType"` CustomSerDeInputs string `json:"customSerdeInputString"` CustomSchemaInput string `json:"customSchemaInputString"` OutputSerDeClassName string `json:"outputSerdeClassName"` FunctionConfigFile string `json:"fnConfigFile"` ProcessingGuarantees string `json:"processingGuarantees"` UserConfig string `json:"userConfigString"` FuncConf *FunctionConfig `json:"-"` }
FunctionData information for a Pulsar Function
type FunctionInstanceStats ¶
type FunctionInstanceStats struct { FunctionInstanceStatsDataBase InstanceID int64 `json:"instanceId"` Metrics FunctionInstanceStatsData `json:"metrics"` }
type FunctionInstanceStatsData ¶
type FunctionInstanceStatsData struct { OneMin FunctionInstanceStatsDataBase `json:"oneMin"` // Timestamp of when the function was last invoked for instance LastInvocation int64 `json:"lastInvocation"` // Map of user defined metrics UserMetrics map[string]float64 `json:"userMetrics"` FunctionInstanceStatsDataBase }
type FunctionInstanceStatsDataBase ¶
type FunctionInstanceStatsDataBase struct { // Total number of records function received from source for instance ReceivedTotal int64 `json:"receivedTotal"` // Total number of records successfully processed by user function for instance ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` // Total number of system exceptions thrown for instance SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` // Total number of user exceptions thrown for instance UserExceptionsTotal int64 `json:"userExceptionsTotal"` // Average process latency for function for instance AvgProcessLatency float64 `json:"avgProcessLatency"` }
type FunctionInstanceStatus ¶
type FunctionInstanceStatus struct { InstanceID int `json:"instanceId"` Status FunctionInstanceStatusData `json:"status"` }
type FunctionInstanceStatusData ¶
type FunctionInstanceStatusData struct { Running bool `json:"running"` Err string `json:"error"` NumRestarts int64 `json:"numRestarts"` NumReceived int64 `json:"numReceived"` NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"` NumUserExceptions int64 `json:"numUserExceptions"` LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"` NumSystemExceptions int64 `json:"numSystemExceptions"` LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` AverageLatency float64 `json:"averageLatency"` LastInvocationTime int64 `json:"lastInvocationTime"` WorkerID string `json:"workerId"` }
type FunctionState ¶
type FunctionStats ¶
type FunctionStats struct { // Overall total number of records function received from source ReceivedTotal int64 `json:"receivedTotal"` // Overall total number of records successfully processed by user function ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` // Overall total number of system exceptions thrown SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` // Overall total number of user exceptions thrown UserExceptionsTotal int64 `json:"userExceptionsTotal"` // Average process latency for function AvgProcessLatency float64 `json:"avgProcessLatency"` // Timestamp of when the function was last invoked by any instance LastInvocation int64 `json:"lastInvocation"` OneMin FunctionInstanceStatsDataBase `json:"oneMin"` Instances []FunctionInstanceStats `json:"instances"` FunctionInstanceStats }
func (*FunctionStats) AddInstance ¶
func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats)
func (*FunctionStats) CalculateOverall ¶
func (fs *FunctionStats) CalculateOverall() *FunctionStats
type FunctionStatus ¶
type FunctionStatus struct { NumInstances int `json:"numInstances"` NumRunning int `json:"numRunning"` Instances []FunctionInstanceStatus `json:"instances"` }
type Functions ¶
type Functions interface { // CreateFunc create a new function. CreateFunc(data *FunctionConfig, fileName string) error // CreateFuncWithURL create a new function by providing url from which fun-pkg can be downloaded. // supported url: http/file // eg: // File: file:/dir/fileName.jar // Http: http://www.repo.com/fileName.jar // // @param functionConfig // the function configuration object // @param pkgURL // url from which pkg can be downloaded CreateFuncWithURL(data *FunctionConfig, pkgURL string) error // StopFunction stop all function instances StopFunction(tenant, namespace, name string) error // StopFunctionWithID stop function instance StopFunctionWithID(tenant, namespace, name string, instanceID int) error // DeleteFunction delete an existing function DeleteFunction(tenant, namespace, name string) error // StartFunction start all function instances StartFunction(tenant, namespace, name string) error // StartFunctionWithID start function instance StartFunctionWithID(tenant, namespace, name string, instanceID int) error // RestartFunction restart all function instances RestartFunction(tenant, namespace, name string) error // RestartFunctionWithID restart function instance RestartFunctionWithID(tenant, namespace, name string, instanceID int) error // GetFunctions returns the list of functions GetFunctions(tenant, namespace string) ([]string, error) // GetFunction returns the configuration for the specified function GetFunction(tenant, namespace, name string) (FunctionConfig, error) // GetFunctionStatus returns the current status of a function GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) // GetFunctionStatusWithInstanceID returns the current status of a function instance GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) // GetFunctionStats returns the current stats of a function GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) // GetFunctionStatsWithInstanceID gets the current stats of a function instance GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) // GetFunctionState fetch the current state associated with a Pulsar Function // // Response Example: // { "value : 12, version : 2"} GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) // PutFunctionState puts the given state associated with a Pulsar Function PutFunctionState(tenant, namespace, name string, state FunctionState) error // TriggerFunction triggers the function by writing to the input topic TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) // UpdateFunction updates the configuration for a function. UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error // UpdateFunctionWithURL updates the configuration for a function. // // Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file // eg: // File: file:/dir/fileName.jar // Http: http://www.repo.com/fileName.jar UpdateFunctionWithURL(functionConfig *FunctionConfig, pkgURL string, updateOptions *UpdateOptions) error }
Functions is admin interface for functions management
type FunctionsWorker ¶
type FunctionsWorker interface { // Get all functions stats on a worker GetFunctionsStats() ([]*WorkerFunctionInstanceStats, error) // Get worker metrics GetMetrics() ([]*Metrics, error) // Get List of all workers belonging to this cluster GetCluster() ([]*WorkerInfo, error) // Get the worker who is the leader of the clusterv GetClusterLeader() (*WorkerInfo, error) // Get the function assignment among the cluster GetAssignments() (map[string][]string, error) }
type GetSchemaResponse ¶
type KeyValue ¶
type KeyValue struct { Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` Value *string `protobuf:"bytes,2,req,name=value" json:"value,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
nolint
func (*KeyValue) ProtoMessage ¶
func (*KeyValue) ProtoMessage()
type LedgerInfo ¶
type LocalBrokerData ¶
type LocalBrokerData struct { // URLs to satisfy contract of ServiceLookupData (used by NamespaceService). WebServiceURL string `json:"webServiceUrl"` WebServiceURLTLS string `json:"webServiceUrlTls"` PulsarServiceURL string `json:"pulsarServiceUrl"` PulsarServiceURLTLS string `json:"pulsarServiceUrlTls"` PersistentTopicsEnabled bool `json:"persistentTopicsEnabled"` NonPersistentTopicsEnabled bool `json:"nonPersistentTopicsEnabled"` // Most recently available system resource usage. CPU ResourceUsage `json:"cpu"` Memory ResourceUsage `json:"memory"` DirectMemory ResourceUsage `json:"directMemory"` BandwidthIn ResourceUsage `json:"bandwidthIn"` BandwidthOut ResourceUsage `json:"bandwidthOut"` // Message data from the most recent namespace bundle stats. MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` // Timestamp of last update. LastUpdate int64 `json:"lastUpdate"` // The stats given in the most recent invocation of update. LastStats map[string]*NamespaceBundleStats `json:"lastStats"` NumTopics int `json:"numTopics"` NumBundles int `json:"numBundles"` NumConsumers int `json:"numConsumers"` NumProducers int `json:"numProducers"` // All bundles belonging to this broker. Bundles []string `json:"bundles"` // The bundles gained since the last invocation of update. LastBundleGains []string `json:"lastBundleGains"` // The bundles lost since the last invocation of update. LastBundleLosses []string `json:"lastBundleLosses"` // The version string that this broker is running, obtained from the Maven build artifact in the POM BrokerVersionString string `json:"brokerVersionString"` // This place-holder requires to identify correct LoadManagerReport type while deserializing LoadReportType string `json:"loadReportType"` // the external protocol data advertised by protocol handlers. Protocols map[string]string `json:"protocols"` }
func NewLocalBrokerData ¶
func NewLocalBrokerData() LocalBrokerData
type LongDescription ¶
type LongDescription struct { CommandUsedFor string CommandPermission string CommandExamples []Example CommandOutput []Output CommandScope string }
func (*LongDescription) ExampleToString ¶
func (desc *LongDescription) ExampleToString() string
func (*LongDescription) ToString ¶
func (desc *LongDescription) ToString() string
type LookupData ¶
type ManagedLedgerInfo ¶
type ManagedLedgerInfo struct { Version int `json:"version"` CreationDate string `json:"creationDate"` ModificationData string `json:"modificationData"` Ledgers []LedgerInfo `json:"ledgers"` TerminatedPosition PositionInfo `json:"terminatedPosition"` Cursors map[string]CursorInfo `json:"cursors"` }
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
func (*Message) GetMessageID ¶
func (*Message) GetPayload ¶
func (*Message) GetProperties ¶
type MessageID ¶
type MessageID struct { LedgerID int64 `json:"ledgerId"` EntryID int64 `json:"entryId"` PartitionedIndex int `json:"partitionedIndex"` BatchIndex int `json:"-"` }
func ParseMessageID ¶
type MessageRangeInfo ¶
type MessageRangeInfo struct { From PositionInfo `json:"from"` To PositionInfo `json:"to"` Offloaded bool `json:"offloaded"` }
type Metrics ¶
type Metrics struct { Metrics map[string]interface{} `json:"metrics"` Dimensions map[string]string `json:"dimensions"` }
func NewMetrics ¶
type NameSpaceName ¶
type NameSpaceName struct {
// contains filtered or unexported fields
}
func GetNameSpaceName ¶
func GetNameSpaceName(tenant, namespace string) (*NameSpaceName, error)
func GetNamespaceName ¶
func GetNamespaceName(completeName string) (*NameSpaceName, error)
func (*NameSpaceName) String ¶
func (n *NameSpaceName) String() string
type NamespaceBundleStats ¶
type NamespaceBundleStats struct { MsgRateIn float64 `json:"msgRateIn"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` ConsumerCount int `json:"consumerCount"` ProducerCount int `json:"producerCount"` TopicsNum int64 `json:"topics"` CacheSize int64 `json:"cacheSize"` // Consider the throughput equal if difference is less than 100 KB/s ThroughputDifferenceThreshold float64 `json:"throughputDifferenceThreshold"` // Consider the msgRate equal if the difference is less than 100 MsgRateDifferenceThreshold float64 `json:"msgRateDifferenceThreshold"` // Consider the total topics/producers/consumers equal if the difference is less than 500 TopicConnectionDifferenceThreshold int64 `json:"topicConnectionDifferenceThreshold"` // Consider the cache size equal if the difference is less than 100 kb CacheSizeDifferenceThreshold int64 `json:"cacheSizeDifferenceThreshold"` }
func NewNamespaceBundleStats ¶
func NewNamespaceBundleStats() *NamespaceBundleStats
type NamespaceIsolationData ¶
type NamespaceIsolationData struct { Namespaces []string `json:"namespaces"` Primary []string `json:"primary"` Secondary []string `json:"secondary"` AutoFailoverPolicy AutoFailoverPolicyData `json:"auto_failover_policy"` }
type NamespaceOwnershipStatus ¶
type NamespaceOwnershipStatus struct { BrokerAssignment BrokerAssignment `json:"broker_assignment"` IsControlled bool `json:"is_controlled"` IsActive bool `json:"is_active"` }
type Namespaces ¶
type Namespaces interface { // GetNamespaces returns the list of all the namespaces for a certain tenant GetNamespaces(tenant string) ([]string, error) // GetTopics returns the list of all the topics under a certain namespace GetTopics(namespace string) ([]string, error) // GetPolicies returns the dump all the policies specified for a namespace GetPolicies(namespace string) (*Policies, error) // CreateNamespace creates a new empty namespace with no policies attached CreateNamespace(namespace string) error // CreateNsWithNumBundles creates a new empty namespace with no policies attached CreateNsWithNumBundles(namespace string, numBundles int) error // CreateNsWithPolices creates a new namespace with the specified policies CreateNsWithPolices(namespace string, polices Policies) error // CreateNsWithBundlesData creates a new empty namespace with no policies attached CreateNsWithBundlesData(namespace string, bundleData *BundlesData) error // DeleteNamespace deletes an existing namespace DeleteNamespace(namespace string) error // DeleteNamespaceBundle deletes an existing bundle in a namespace DeleteNamespaceBundle(namespace string, bundleRange string) error // SetNamespaceMessageTTL sets the messages Time to Live for all the topics within a namespace SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error // GetNamespaceMessageTTL returns the message TTL for a namespace GetNamespaceMessageTTL(namespace string) (int, error) // GetRetention returns the retention configuration for a namespace GetRetention(namespace string) (*RetentionPolicies, error) // SetRetention sets the retention configuration for all the topics on a namespace SetRetention(namespace string, policy RetentionPolicies) error // GetBacklogQuotaMap returns backlog quota map on a namespace GetBacklogQuotaMap(namespace string) (map[BacklogQuotaType]BacklogQuota, error) // SetBacklogQuota sets a backlog quota for all the topics on a namespace SetBacklogQuota(namespace string, backlogQuota BacklogQuota) error // RemoveBacklogQuota removes a backlog quota policy from a namespace RemoveBacklogQuota(namespace string) error // SetSchemaValidationEnforced sets schema validation enforced for namespace SetSchemaValidationEnforced(namespace NameSpaceName, schemaValidationEnforced bool) error // GetSchemaValidationEnforced returns schema validation enforced for namespace GetSchemaValidationEnforced(namespace NameSpaceName) (bool, error) // SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the a new schema provided // by a producer is compatible with the current schema before it is installed SetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName, strategy SchemaCompatibilityStrategy) error // GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the a new schema provided // by a producer is compatible with the current schema before it is installed GetSchemaAutoUpdateCompatibilityStrategy(namespace NameSpaceName) (SchemaCompatibilityStrategy, error) // ClearOffloadDeleteLag clears the offload deletion lag for a namespace. ClearOffloadDeleteLag(namespace NameSpaceName) error // SetOffloadDeleteLag sets the offload deletion lag for a namespace SetOffloadDeleteLag(namespace NameSpaceName, timeMs int64) error // GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds GetOffloadDeleteLag(namespace NameSpaceName) (int64, error) // SetOffloadThreshold sets the offloadThreshold for a namespace SetOffloadThreshold(namespace NameSpaceName, threshold int64) error // GetOffloadThreshold returns the offloadThreshold for a namespace GetOffloadThreshold(namespace NameSpaceName) (int64, error) // SetCompactionThreshold sets the compactionThreshold for a namespace SetCompactionThreshold(namespace NameSpaceName, threshold int64) error // GetCompactionThreshold returns the compactionThreshold for a namespace GetCompactionThreshold(namespace NameSpaceName) (int64, error) // SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace. SetMaxConsumersPerSubscription(namespace NameSpaceName, max int) error // GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace. GetMaxConsumersPerSubscription(namespace NameSpaceName) (int, error) // SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace. SetMaxConsumersPerTopic(namespace NameSpaceName, max int) error // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace. GetMaxConsumersPerTopic(namespace NameSpaceName) (int, error) // SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace. SetMaxProducersPerTopic(namespace NameSpaceName, max int) error // GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace. GetMaxProducersPerTopic(namespace NameSpaceName) (int, error) // GetNamespaceReplicationClusters returns the replication clusters for a namespace GetNamespaceReplicationClusters(namespace string) ([]string, error) // SetNamespaceReplicationClusters returns the replication clusters for a namespace SetNamespaceReplicationClusters(namespace string, clusterIds []string) error // SetNamespaceAntiAffinityGroup sets anti-affinity group name for a namespace SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error // GetAntiAffinityNamespaces returns all namespaces that grouped with given anti-affinity group GetAntiAffinityNamespaces(tenant, cluster, namespaceAntiAffinityGroup string) ([]string, error) // GetNamespaceAntiAffinityGroup returns anti-affinity group name for a namespace GetNamespaceAntiAffinityGroup(namespace string) (string, error) // DeleteNamespaceAntiAffinityGroup deletes anti-affinity group name for a namespace DeleteNamespaceAntiAffinityGroup(namespace string) error // SetDeduplicationStatus sets the deduplication status for all topics within a namespace // When deduplication is enabled, the broker will prevent to store the same Message multiple times SetDeduplicationStatus(namespace string, enableDeduplication bool) error // SetPersistence sets the persistence configuration for all the topics on a namespace SetPersistence(namespace string, persistence PersistencePolicies) error // GetPersistence returns the persistence configuration for a namespace GetPersistence(namespace string) (*PersistencePolicies, error) // SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are // part of given affinity group SetBookieAffinityGroup(namespace string, bookieAffinityGroup BookieAffinityGroupData) error // DeleteBookieAffinityGroup deletes bookie affinity group configured for a namespace DeleteBookieAffinityGroup(namespace string) error // GetBookieAffinityGroup returns bookie affinity group configured for a namespace GetBookieAffinityGroup(namespace string) (*BookieAffinityGroupData, error) // Unload a namespace from the current serving broker Unload(namespace string) error // UnloadNamespaceBundle unloads namespace bundle UnloadNamespaceBundle(namespace, bundle string) error // SplitNamespaceBundle splits namespace bundle SplitNamespaceBundle(namespace, bundle string, unloadSplitBundles bool) error // GetNamespacePermissions returns permissions on a namespace GetNamespacePermissions(namespace NameSpaceName) (map[string][]AuthAction, error) // GrantNamespacePermission grants permission on a namespace. GrantNamespacePermission(namespace NameSpaceName, role string, action []AuthAction) error // RevokeNamespacePermission revokes permissions on a namespace. RevokeNamespacePermission(namespace NameSpaceName, role string) error // GrantSubPermission grants permission to role to access subscription's admin-api GrantSubPermission(namespace NameSpaceName, sName string, roles []string) error // RevokeSubPermission revoke permissions on a subscription's admin-api access RevokeSubPermission(namespace NameSpaceName, sName, role string) error // SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace SetSubscriptionAuthMode(namespace NameSpaceName, mode SubscriptionAuthMode) error // SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace SetEncryptionRequiredStatus(namespace NameSpaceName, encrypt bool) error // UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace UnsubscribeNamespace(namespace NameSpaceName, sName string) error // UnsubscribeNamespaceBundle unsubscribe the given subscription on all topics on a namespace bundle UnsubscribeNamespaceBundle(namespace NameSpaceName, bundle, sName string) error // ClearNamespaceBundleBacklogForSubscription clears backlog for a given subscription on all // topics on a namespace bundle ClearNamespaceBundleBacklogForSubscription(namespace NameSpaceName, bundle, sName string) error // ClearNamespaceBundleBacklog clears backlog for all topics on a namespace bundle ClearNamespaceBundleBacklog(namespace NameSpaceName, bundle string) error // ClearNamespaceBacklogForSubscription clears backlog for a given subscription on all topics on a namespace ClearNamespaceBacklogForSubscription(namespace NameSpaceName, sName string) error // ClearNamespaceBacklog clears backlog for all topics on a namespace ClearNamespaceBacklog(namespace NameSpaceName) error // SetReplicatorDispatchRate sets replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) SetReplicatorDispatchRate(namespace NameSpaceName, rate DispatchRate) error // Get replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) GetReplicatorDispatchRate(namespace NameSpaceName) (DispatchRate, error) // SetSubscriptionDispatchRate sets subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) SetSubscriptionDispatchRate(namespace NameSpaceName, rate DispatchRate) error // GetSubscriptionDispatchRate returns subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) GetSubscriptionDispatchRate(namespace NameSpaceName) (DispatchRate, error) // SetSubscribeRate sets namespace-subscribe-rate (topics under this namespace will limit by subscribeRate) SetSubscribeRate(namespace NameSpaceName, rate SubscribeRate) error // GetSubscribeRate returns namespace-subscribe-rate (topics under this namespace allow subscribe // times per consumer in a period) GetSubscribeRate(namespace NameSpaceName) (SubscribeRate, error) // SetDispatchRate sets Message-dispatch-rate (topics under this namespace can dispatch // this many messages per second) SetDispatchRate(namespace NameSpaceName, rate DispatchRate) error // GetDispatchRate returns Message-dispatch-rate (topics under this namespace can dispatch // this many messages per second) GetDispatchRate(namespace NameSpaceName) (DispatchRate, error) }
Namespaces is admin interface for namespaces management
type NamespacesData ¶
type NamespacesData struct { Enable bool `json:"enable"` Unload bool `json:"unload"` NumBundles int `json:"numBundles"` BookkeeperEnsemble int `json:"bookkeeperEnsemble"` BookkeeperWriteQuorum int `json:"bookkeeperWriteQuorum"` MessageTTL int `json:"messageTTL"` BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` ClusterIds string `json:"clusterIds"` RetentionTimeStr string `json:"retentionTimeStr"` LimitStr string `json:"limitStr"` PolicyStr string `json:"policyStr"` AntiAffinityGroup string `json:"antiAffinityGroup"` Tenant string `json:"tenant"` Cluster string `json:"cluster"` Bundle string `json:"bundle"` Clusters []string `json:"clusters"` }
type NsIsolationPoliciesData ¶
type NsIsolationPolicy ¶
type NsIsolationPolicy interface { // Create a namespace isolation policy for a cluster CreateNamespaceIsolationPolicy(cluster, policyName string, namespaceIsolationData NamespaceIsolationData) error // Delete a namespace isolation policy for a cluster DeleteNamespaceIsolationPolicy(cluster, policyName string) error // Get a single namespace isolation policy for a cluster GetNamespaceIsolationPolicy(cluster, policyName string) (*NamespaceIsolationData, error) // Get the namespace isolation policies of a cluster GetNamespaceIsolationPolicies(cluster string) (map[string]NamespaceIsolationData, error) // Returns list of active brokers with namespace-isolation policies attached to it. GetBrokersWithNamespaceIsolationPolicy(cluster string) ([]BrokerNamespaceIsolationData, error) // Returns active broker with namespace-isolation policies attached to it. GetBrokerWithNamespaceIsolationPolicy(cluster, broker string) (*BrokerNamespaceIsolationData, error) }
type OffloadProcessStatus ¶
type PartitionedTopicMetadata ¶
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"`
}
Topic data
type PartitionedTopicStats ¶
type PartitionedTopicStats struct { MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` AverageMsgSize float64 `json:"averageMsgSize"` StorageSize int64 `json:"storageSize"` Publishers []PublisherStats `json:"publishers"` Subscriptions map[string]SubscriptionStats `json:"subscriptions"` Replication map[string]ReplicatorStats `json:"replication"` DeDuplicationStatus string `json:"deduplicationStatus"` Metadata PartitionedTopicMetadata `json:"metadata"` Partitions map[string]TopicStats `json:"partitions"` }
type PersistencePolicies ¶
type PersistencePolicies struct { BookkeeperEnsemble int `json:"bookkeeperEnsemble"` BookkeeperWriteQuorum int `json:"bookkeeperWriteQuorum"` BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` }
func NewPersistencePolicies ¶
func NewPersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum, bookkeeperAckQuorum int, managedLedgerMaxMarkDeleteRate float64) PersistencePolicies
type PersistentTopicInternalStats ¶
type PersistentTopicInternalStats struct { WaitingCursorsCount int `json:"waitingCursorsCount"` PendingAddEntriesCount int `json:"pendingAddEntriesCount"` EntriesAddedCounter int64 `json:"entriesAddedCounter"` NumberOfEntries int64 `json:"numberOfEntries"` TotalSize int64 `json:"totalSize"` CurrentLedgerEntries int64 `json:"currentLedgerEntries"` CurrentLedgerSize int64 `json:"currentLedgerSize"` LastLedgerCreatedTimestamp string `json:"lastLedgerCreatedTimestamp"` LastLedgerCreationFailureTimestamp string `json:"lastLedgerCreationFailureTimestamp"` LastConfirmedEntry string `json:"lastConfirmedEntry"` State string `json:"state"` Ledgers []LedgerInfo `json:"ledgers"` Cursors map[string]CursorStats `json:"cursors"` }
type Policies ¶
type Policies struct { Bundles *BundlesData `json:"bundles"` Persistence *PersistencePolicies `json:"persistence"` RetentionPolicies *RetentionPolicies `json:"retention_policies"` SchemaValidationEnforced bool `json:"schema_validation_enforced"` DeduplicationEnabled bool `json:"deduplicationEnabled"` Deleted bool `json:"deleted"` EncryptionRequired bool `json:"encryption_required"` MessageTTLInSeconds int `json:"message_ttl_in_seconds"` MaxProducersPerTopic int `json:"max_producers_per_topic"` MaxConsumersPerTopic int `json:"max_consumers_per_topic"` MaxConsumersPerSubscription int `json:"max_consumers_per_subscription"` CompactionThreshold int64 `json:"compaction_threshold"` OffloadThreshold int64 `json:"offload_threshold"` OffloadDeletionLagMs int64 `json:"offload_deletion_lag_ms"` AntiAffinityGroup string `json:"antiAffinityGroup"` ReplicationClusters []string `json:"replication_clusters"` LatencyStatsSampleRate map[string]int `json:"latency_stats_sample_rate"` BacklogQuotaMap map[BacklogQuotaType]BacklogQuota `json:"backlog_quota_map"` TopicDispatchRate map[string]DispatchRate `json:"topicDispatchRate"` SubscriptionDispatchRate map[string]DispatchRate `json:"subscriptionDispatchRate"` ReplicatorDispatchRate map[string]DispatchRate `json:"replicatorDispatchRate"` ClusterSubscribeRate map[string]SubscribeRate `json:"clusterSubscribeRate"` SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schema_auto_update_compatibility_strategy"` AuthPolicies AuthPolicies `json:"auth_policies"` SubscriptionAuthMode SubscriptionAuthMode `json:"subscription_auth_mode"` }
func NewDefaultPolicies ¶
func NewDefaultPolicies() *Policies
type PoolArenaStats ¶
type PoolArenaStats struct { NumTinySubpages int `json:"numTinySubpages"` NumSmallSubpages int `json:"numSmallSubpages"` NumChunkLists int `json:"numChunkLists"` TinySubpages []PoolSubpageStats `json:"tinySubpages"` SmallSubpages []PoolSubpageStats `json:"smallSubpages"` ChunkLists []PoolChunkListStats `json:"chunkLists"` NumAllocations int64 `json:"numAllocations"` NumTinyAllocations int64 `json:"numTinyAllocations"` NumSmallAllocations int64 `json:"numSmallAllocations"` NumNormalAllocations int64 `json:"numNormalAllocations"` NumHugeAllocations int64 `json:"numHugeAllocations"` NumDeallocations int64 `json:"numDeallocations"` NumTinyDeallocations int64 `json:"numTinyDeallocations"` NumSmallDeallocations int64 `json:"numSmallDeallocations"` NumNormalDeallocations int64 `json:"numNormalDeallocations"` NumHugeDeallocations int64 `json:"numHugeDeallocations"` NumActiveAllocations int64 `json:"numActiveAllocations"` NumActiveTinyAllocations int64 `json:"numActiveTinyAllocations"` NumActiveSmallAllocations int64 `json:"numActiveSmallAllocations"` NumActiveNormalAllocations int64 `json:"numActiveNormalAllocations"` NumActiveHugeAllocations int64 `json:"numActiveHugeAllocations"` }
type PoolChunkListStats ¶
type PoolChunkListStats struct { MinUsage int `json:"minUsage"` MaxUsage int `json:"maxUsage"` Chunks []PoolChunkStats `json:"chunks"` }
type PoolChunkStats ¶
type PoolSubpageStats ¶
type PositionInfo ¶
type PostSchemaPayload ¶
type PostSchemaPayload struct { SchemaType string `json:"type"` Schema string `json:"schema"` Properties map[string]string `json:"properties"` }
Payload with information about a schema
type PublisherStats ¶
type ReplicatorStats ¶
type ReplicatorStats struct { Connected bool `json:"connected"` MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateExpired float64 `json:"msgRateExpired"` ReplicationBacklog int64 `json:"replicationBacklog"` ReplicationDelayInSeconds int64 `json:"replicationDelayInSeconds"` InboundConnection string `json:"inboundConnection"` InboundConnectedSince string `json:"inboundConnectedSince"` OutboundConnection string `json:"outboundConnection"` OutboundConnectedSince string `json:"outboundConnectedSince"` }
type ResourceQuota ¶
type ResourceQuota struct { // messages published per second MsgRateIn float64 `json:"msgRateIn"` // messages consumed per second MsgRateOut float64 `json:"msgRateOut"` // incoming bytes per second BandwidthIn float64 `json:"bandwidthIn"` // outgoing bytes per second BandwidthOut float64 `json:"bandwidthOut"` // used memory in Mbytes Memory float64 `json:"memory"` // allow the quota be dynamically re-calculated according to real traffic Dynamic bool `json:"dynamic"` }
func NewResourceQuota ¶
func NewResourceQuota() *ResourceQuota
type ResourceQuotaData ¶
type ResourceQuotaData struct { Names string `json:"names"` Bundle string `json:"bundle"` MsgRateIn int64 `json:"msgRateIn"` MsgRateOut int64 `json:"msgRateOut"` BandwidthIn int64 `json:"bandwidthIn"` BandwidthOut int64 `json:"bandwidthOut"` Memory int64 `json:"memory"` Dynamic bool `json:"dynamic"` }
type ResourceQuotas ¶
type ResourceQuotas interface { // Get default resource quota for new resource bundles. GetDefaultResourceQuota() (*ResourceQuota, error) // Set default resource quota for new namespace bundles. SetDefaultResourceQuota(quota ResourceQuota) error // Get resource quota of a namespace bundle. GetNamespaceBundleResourceQuota(namespace, bundle string) (*ResourceQuota, error) // Set resource quota for a namespace bundle. SetNamespaceBundleResourceQuota(namespace, bundle string, quota ResourceQuota) error // Reset resource quota for a namespace bundle to default value. ResetNamespaceBundleResourceQuota(namespace, bundle string) error }
type ResourceUsage ¶
func (*ResourceUsage) CompareTo ¶
func (ru *ResourceUsage) CompareTo(o *ResourceUsage) int
func (*ResourceUsage) PercentUsage ¶
func (ru *ResourceUsage) PercentUsage() float32
func (*ResourceUsage) Reset ¶
func (ru *ResourceUsage) Reset()
type Resources ¶
func NewDefaultResources ¶
func NewDefaultResources() *Resources
type RetentionPolicies ¶
type RetentionPolicies struct { RetentionTimeInMinutes int `json:"retentionTimeInMinutes"` RetentionSizeInMB int64 `json:"retentionSizeInMB"` }
func NewRetentionPolicies ¶
func NewRetentionPolicies(retentionTimeInMinutes int, retentionSizeInMB int) RetentionPolicies
type RetentionPolicy ¶
type RetentionPolicy string
const ( ProducerRequestHold RetentionPolicy = "producer_request_hold" ProducerException RetentionPolicy = "producer_exception" ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction" )
type Schema ¶
type Schema interface { // GetSchemaInfo retrieves the latest schema of a topic GetSchemaInfo(topic string) (*SchemaInfo, error) // GetSchemaInfoWithVersion retrieves the latest schema with version of a topic GetSchemaInfoWithVersion(topic string) (*SchemaInfoWithVersion, error) // GetSchemaInfoByVersion retrieves the schema of a topic at a given <tt>version</tt> GetSchemaInfoByVersion(topic string, version int64) (*SchemaInfo, error) // DeleteSchema deletes the schema associated with a given <tt>topic</tt> DeleteSchema(topic string) error // CreateSchemaByPayload creates a schema for a given <tt>topic</tt> CreateSchemaByPayload(topic string, schemaPayload PostSchemaPayload) error }
Schema is admin interface for schema management
type SchemaCompatibilityStrategy ¶
type SchemaCompatibilityStrategy string
const ( AutoUpdateDisabled SchemaCompatibilityStrategy = "AutoUpdateDisabled" Backward SchemaCompatibilityStrategy = "Backward" Forward SchemaCompatibilityStrategy = "Forward" Full SchemaCompatibilityStrategy = "Full" AlwaysCompatible SchemaCompatibilityStrategy = "AlwaysCompatible" BackwardTransitive SchemaCompatibilityStrategy = "BackwardTransitive" ForwardTransitive SchemaCompatibilityStrategy = "ForwardTransitive" FullTransitive SchemaCompatibilityStrategy = "FullTransitive" )
func ParseSchemaAutoUpdateCompatibilityStrategy ¶
func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaCompatibilityStrategy, error)
func (SchemaCompatibilityStrategy) String ¶
func (s SchemaCompatibilityStrategy) String() string
type SchemaData ¶
type SchemaInfo ¶
type SchemaInfoWithVersion ¶
type SchemaInfoWithVersion struct { Version int64 `json:"version"` SchemaInfo *SchemaInfo `json:"schemaInfo"` }
type SingleMessageMetadata ¶
type SingleMessageMetadata struct { Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"` PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"` PayloadSize *int32 `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"` CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"` // the timestamp that this event occurs. it is typically set by applications. // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"` PartitionKeyB64Encoded *bool `` /* 131-byte string literal not displayed */ // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. OrderingKey []byte `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
nolint
func (*SingleMessageMetadata) GetPayloadSize ¶
func (m *SingleMessageMetadata) GetPayloadSize() int32
func (*SingleMessageMetadata) ProtoMessage ¶
func (*SingleMessageMetadata) ProtoMessage()
func (*SingleMessageMetadata) Reset ¶
func (m *SingleMessageMetadata) Reset()
func (*SingleMessageMetadata) String ¶
func (m *SingleMessageMetadata) String() string
type SinkConfig ¶
type SinkConfig struct { TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"` Resources *Resources `json:"resources" yaml:"resources"` TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` // Whether the subscriptions the functions created/used should be deleted when the functions is deleted CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"` RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"` AutoAck bool `json:"autoAck" yaml:"autoAck"` Parallelism int `json:"parallelism" yaml:"parallelism"` Tenant string `json:"tenant" yaml:"tenant"` Namespace string `json:"namespace" yaml:"namespace"` Name string `json:"name" yaml:"name"` ClassName string `json:"className" yaml:"className"` Archive string `json:"archive" yaml:"archive"` ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"` SourceSubscriptionName string `json:"sourceSubscriptionName" yaml:"sourceSubscriptionName"` RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"` Inputs []string `json:"inputs" yaml:"inputs"` TopicToSerdeClassName map[string]string `json:"topicToSerdeClassName" yaml:"topicToSerdeClassName"` TopicToSchemaType map[string]string `json:"topicToSchemaType" yaml:"topicToSchemaType"` InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"` Configs map[string]interface{} `json:"configs" yaml:"configs"` // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that // encapsulates how the secret is fetched by the underlying // secrets provider. The type of an value here can be found by the // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} `json:"secrets" yaml:"secrets"` }
type SinkData ¶
type SinkData struct { UpdateAuthData bool `json:"updateAuthData"` RetainOrdering bool `json:"retainOrdering"` AutoAck bool `json:"autoAck"` Parallelism int `json:"parallelism"` RAM int64 `json:"ram"` Disk int64 `json:"disk"` TimeoutMs int64 `json:"timeoutMs"` CPU float64 `json:"cpu"` Tenant string `json:"tenant"` Namespace string `json:"namespace"` Name string `json:"name"` SinkType string `json:"sinkType"` Inputs string `json:"inputs"` TopicsPattern string `json:"topicsPattern"` SubsName string `json:"subsName"` CustomSerdeInputString string `json:"customSerdeInputString"` CustomSchemaInputString string `json:"customSchemaInputString"` ProcessingGuarantees string `json:"processingGuarantees"` Archive string `json:"archive"` ClassName string `json:"className"` SinkConfigFile string `json:"sinkConfigFile"` SinkConfigString string `json:"sinkConfigString"` InstanceID string `json:"instanceId"` SinkConf *SinkConfig `json:"-"` }
type SinkInstanceStatus ¶
type SinkInstanceStatus struct { InstanceID int `json:"instanceId"` Status SourceInstanceStatusData `json:"status"` }
type SinkInstanceStatusData ¶
type SinkInstanceStatusData struct { // Is this instance running? Running bool `json:"running"` // Do we have any error while running this instance Err string `json:"error"` // Number of times this instance has restarted NumRestarts int64 `json:"numRestarts"` // Number of messages read from Pulsar NumReadFromPulsar int64 `json:"numReadFromPulsar"` // Number of times there was a system exception handling messages NumSystemExceptions int64 `json:"numSystemExceptions"` // A list of the most recent system exceptions LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` // Number of times there was a sink exception NumSinkExceptions int64 `json:"numSinkExceptions"` // A list of the most recent sink exceptions LatestSinkExceptions []ExceptionInformation `json:"latestSinkExceptions"` // Number of messages written to sink NumWrittenToSink int64 `json:"numWrittenToSink"` // When was the last time we received a Message from Pulsar LastReceivedTime int64 `json:"lastReceivedTime"` WorkerID string `json:"workerId"` }
type SinkStatus ¶
type SinkStatus struct { // The total number of sink instances that ought to be running NumInstances int `json:"numInstances"` // The number of source instances that are actually running NumRunning int `json:"numRunning"` Instances []*SinkInstanceStatus `json:"instances"` }
type Sinks ¶
type Sinks interface { // ListSinks returns the list of all the Pulsar Sinks. ListSinks(tenant, namespace string) ([]string, error) // GetSink returns the configuration for the specified sink GetSink(tenant, namespace, Sink string) (SinkConfig, error) // CreateSink creates a new sink CreateSink(config *SinkConfig, fileName string) error // CreateSinkWithURL creates a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file CreateSinkWithURL(config *SinkConfig, pkgURL string) error // UpdateSink updates the configuration for a sink. UpdateSink(config *SinkConfig, fileName string, options *UpdateOptions) error // UpdateSinkWithURL updates a sink by providing url from which fun-pkg can be downloaded. supported url: http/file UpdateSinkWithURL(config *SinkConfig, pkgURL string, options *UpdateOptions) error // DeleteSink deletes an existing sink DeleteSink(tenant, namespace, Sink string) error // GetSinkStatus returns the current status of a sink. GetSinkStatus(tenant, namespace, Sink string) (SinkStatus, error) // GetSinkStatusWithID returns the current status of a sink instance. GetSinkStatusWithID(tenant, namespace, Sink string, id int) (SinkInstanceStatusData, error) // RestartSink restarts all sink instances RestartSink(tenant, namespace, Sink string) error // RestartSinkWithID restarts sink instance RestartSinkWithID(tenant, namespace, Sink string, id int) error // StopSink stops all sink instances StopSink(tenant, namespace, Sink string) error // StopSinkWithID stops sink instance StopSinkWithID(tenant, namespace, Sink string, id int) error // StartSink starts all sink instances StartSink(tenant, namespace, Sink string) error // StartSinkWithID starts sink instance StartSinkWithID(tenant, namespace, Sink string, id int) error // GetBuiltInSinks fetches a list of supported Pulsar IO sinks currently running in cluster mode GetBuiltInSinks() ([]*ConnectorDefinition, error) // ReloadBuiltInSinks reload the available built-in connectors, include Source and Sink ReloadBuiltInSinks() error }
Sinks is admin interface for sinks management
type SourceConfig ¶
type SourceConfig struct { Tenant string `json:"tenant" yaml:"tenant"` Namespace string `json:"namespace" yaml:"namespace"` Name string `json:"name" yaml:"name"` ClassName string `json:"className" yaml:"className"` TopicName string `json:"topicName" yaml:"topicName"` SerdeClassName string `json:"serdeClassName" yaml:"serdeClassName"` SchemaType string `json:"schemaType" yaml:"schemaType"` Configs map[string]interface{} `json:"configs" yaml:"configs"` // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that // encapsulates how the secret is fetched by the underlying // secrets provider. The type of an value here can be found by the // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} `json:"secrets" yaml:"secrets"` Parallelism int `json:"parallelism" yaml:"parallelism"` ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"` Resources *Resources `json:"resources" yaml:"resources"` Archive string `json:"archive" yaml:"archive"` // Any flags that you want to pass to the runtime. RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"` }
type SourceData ¶
type SourceData struct { Tenant string `json:"tenant"` Namespace string `json:"namespace"` Name string `json:"name"` SourceType string `json:"sourceType"` ProcessingGuarantees string `json:"processingGuarantees"` DestinationTopicName string `json:"destinationTopicName"` DeserializationClassName string `json:"deserializationClassName"` SchemaType string `json:"schemaType"` Parallelism int `json:"parallelism"` Archive string `json:"archive"` ClassName string `json:"className"` SourceConfigFile string `json:"sourceConfigFile"` CPU float64 `json:"cpu"` RAM int64 `json:"ram"` Disk int64 `json:"disk"` SourceConfigString string `json:"sourceConfigString"` SourceConf *SourceConfig `json:"-"` InstanceID string `json:"instanceId"` UpdateAuthData bool `json:"updateAuthData"` }
type SourceInstanceStatus ¶
type SourceInstanceStatus struct { InstanceID int `json:"instanceId"` Status SourceInstanceStatusData `json:"status"` }
type SourceInstanceStatusData ¶
type SourceInstanceStatusData struct { Running bool `json:"running"` Err string `json:"error"` NumRestarts int64 `json:"numRestarts"` NumReceivedFromSource int64 `json:"numReceivedFromSource"` NumSystemExceptions int64 `json:"numSystemExceptions"` LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` NumSourceExceptions int64 `json:"numSourceExceptions"` LatestSourceExceptions []ExceptionInformation `json:"latestSourceExceptions"` NumWritten int64 `json:"numWritten"` LastReceivedTime int64 `json:"lastReceivedTime"` WorkerID string `json:"workerId"` }
type SourceStatus ¶
type SourceStatus struct { NumInstances int `json:"numInstances"` NumRunning int `json:"numRunning"` Instances []*SourceInstanceStatus `json:"instances"` }
type Sources ¶
type Sources interface { // ListSources returns the list of all the Pulsar Sources. ListSources(tenant, namespace string) ([]string, error) // GetSource return the configuration for the specified source GetSource(tenant, namespace, source string) (SourceConfig, error) // CreateSource creates a new source CreateSource(config *SourceConfig, fileName string) error // CreateSourceWithURL creates a new source by providing url from which fun-pkg can be downloaded. // supported url: http/file CreateSourceWithURL(config *SourceConfig, pkgURL string) error // UpdateSource updates the configuration for a source. UpdateSource(config *SourceConfig, fileName string, options *UpdateOptions) error // UpdateSourceWithURL updates a source by providing url from which fun-pkg can be downloaded. supported url: http/file UpdateSourceWithURL(config *SourceConfig, pkgURL string, options *UpdateOptions) error // DeleteSource deletes an existing source DeleteSource(tenant, namespace, source string) error // GetSourceStatus returns the current status of a source. GetSourceStatus(tenant, namespace, source string) (SourceStatus, error) // GetSourceStatusWithID returns the current status of a source instance. GetSourceStatusWithID(tenant, namespace, source string, id int) (SourceInstanceStatusData, error) // RestartSource restarts all source instances RestartSource(tenant, namespace, source string) error // RestartSourceWithID restarts source instance RestartSourceWithID(tenant, namespace, source string, id int) error // StopSource stops all source instances StopSource(tenant, namespace, source string) error // StopSourceWithID stops source instance StopSourceWithID(tenant, namespace, source string, id int) error // StartSource starts all source instances StartSource(tenant, namespace, source string) error // StartSourceWithID starts source instance StartSourceWithID(tenant, namespace, source string, id int) error // GetBuiltInSources fetches a list of supported Pulsar IO sources currently running in cluster mode GetBuiltInSources() ([]*ConnectorDefinition, error) // ReloadBuiltInSources reloads the available built-in connectors, include Source and Sink ReloadBuiltInSources() error }
Sources is admin interface for sources management
type SubscribeRate ¶
type SubscribeRate struct { SubscribeThrottlingRatePerConsumer int `json:"subscribeThrottlingRatePerConsumer"` RatePeriodInSecond int `json:"ratePeriodInSecond"` }
func NewSubscribeRate ¶
func NewSubscribeRate() *SubscribeRate
type SubscriptionAuthMode ¶
type SubscriptionAuthMode string
const ( None SubscriptionAuthMode = "None" Prefix SubscriptionAuthMode = "Prefix" )
func ParseSubscriptionAuthMode ¶
func ParseSubscriptionAuthMode(s string) (SubscriptionAuthMode, error)
func (SubscriptionAuthMode) String ¶
func (s SubscriptionAuthMode) String() string
type SubscriptionStats ¶
type SubscriptionStats struct { BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"` IsReplicated bool `json:"isReplicated"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateRedeliver float64 `json:"msgRateRedeliver"` MsgRateExpired float64 `json:"msgRateExpired"` MsgBacklog int64 `json:"msgBacklog"` MsgDelayed int64 `json:"msgDelayed"` UnAckedMessages int64 `json:"unackedMessages"` SubType string `json:"type"` ActiveConsumerName string `json:"activeConsumerName"` Consumers []ConsumerStats `json:"consumers"` }
type Subscriptions ¶
type Subscriptions interface { // Create a new subscription on a topic Create(TopicName, string, MessageID) error // Delete a subscription. // Delete a persistent subscription from a topic. There should not be any active consumers on the subscription Delete(TopicName, string) error // List returns the list of subscriptions List(TopicName) ([]string, error) // ResetCursorToMessageID resets cursor position on a topic subscription // @param // messageID reset subscription to messageId (or previous nearest messageId if given messageId is not valid) ResetCursorToMessageID(TopicName, string, MessageID) error // ResetCursorToTimestamp resets cursor position on a topic subscription // @param // time reset subscription to position closest to time in ms since epoch ResetCursorToTimestamp(TopicName, string, int64) error // ClearBacklog skips all messages on a topic subscription ClearBacklog(TopicName, string) error // SkipMessages skips messages on a topic subscription SkipMessages(TopicName, string, int64) error // ExpireMessages expires all messages older than given N (expireTimeInSeconds) seconds for a given subscription ExpireMessages(TopicName, string, int64) error // ExpireAllMessages expires all messages older than given N (expireTimeInSeconds) seconds for all // subscriptions of the persistent-topic ExpireAllMessages(TopicName, int64) error // PeekMessages peeks messages from a topic subscription PeekMessages(TopicName, string, int) ([]*Message, error) }
Subscriptions is admin interface for subscriptions management
type TLSOptions ¶
type TenantData ¶
type TenantData struct { Name string `json:"-"` AdminRoles []string `json:"adminRoles"` AllowedClusters []string `json:"allowedClusters"` }
Tenant args
type Tenants ¶
type Tenants interface { // Create a new tenant Create(TenantData) error // Delete an existing tenant Delete(string) error // Update the admins for a tenant Update(TenantData) error //List returns the list of tenants List() ([]string, error) // Get returns the config of the tenant. Get(string) (TenantData, error) }
Tenants is admin interface for tenants management
type TopicDomain ¶
type TopicDomain string
func ParseTopicDomain ¶
func ParseTopicDomain(domain string) (TopicDomain, error)
func (TopicDomain) String ¶
func (t TopicDomain) String() string
type TopicName ¶
type TopicName struct {
// contains filtered or unexported fields
}
func GetTopicName ¶
The topic name can be in two different forms, one is fully qualified topic name, the other one is short topic name
func (*TopicName) GetDomain ¶
func (t *TopicName) GetDomain() TopicDomain
func (*TopicName) GetEncodedTopic ¶
func (*TopicName) GetLocalName ¶
func (*TopicName) GetRestPath ¶
func (*TopicName) IsPersistent ¶
type TopicStats ¶
type TopicStats struct { MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` AverageMsgSize float64 `json:"averageMsgSize"` StorageSize int64 `json:"storageSize"` Publishers []PublisherStats `json:"publishers"` Subscriptions map[string]SubscriptionStats `json:"subscriptions"` Replication map[string]ReplicatorStats `json:"replication"` DeDuplicationStatus string `json:"deduplicationStatus"` }
type TopicStatsStream ¶
type TopicStatsStream struct {
TopicsMap map[string]map[string]map[string]TopicStats `json:"topicStatsBuf"`
}
type Topics ¶
type Topics interface { // Create a topic Create(TopicName, int) error // Delete a topic Delete(TopicName, bool, bool) error // Update number of partitions of a non-global partitioned topic // It requires partitioned-topic to be already exist and number of new partitions must be greater than existing // number of partitions. Decrementing number of partitions requires deletion of topic which is not supported. Update(TopicName, int) error // GetMetadata returns metadata of a partitioned topic GetMetadata(TopicName) (PartitionedTopicMetadata, error) // List returns the list of topics under a namespace List(NameSpaceName) ([]string, []string, error) // GetInternalInfo returns the internal metadata info for the topic GetInternalInfo(TopicName) (ManagedLedgerInfo, error) // GetPermissions returns permissions on a topic // Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the // namespace level combined (union) with any eventual specific permission set on the topic. GetPermissions(TopicName) (map[string][]AuthAction, error) // GrantPermission grants a new permission to a client role on a single topic GrantPermission(TopicName, string, []AuthAction) error // RevokePermission revokes permissions to a client role on a single topic. If the permission // was not set at the topic level, but rather at the namespace level, this operation will // return an error (HTTP status code 412). RevokePermission(TopicName, string) error // Lookup a topic returns the broker URL that serves the topic Lookup(TopicName) (LookupData, error) // GetBundleRange returns a bundle range of a topic GetBundleRange(TopicName) (string, error) // GetLastMessageID returns the last commit message Id of a topic GetLastMessageID(TopicName) (MessageID, error) // GetStats returns the stats for the topic // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetStats(TopicName) (TopicStats, error) // GetInternalStats returns the internal stats for the topic. GetInternalStats(TopicName) (PersistentTopicInternalStats, error) // GetPartitionedStats returns the stats for the partitioned topic // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetPartitionedStats(TopicName, bool) (PartitionedTopicStats, error) // Terminate the topic and prevent any more messages being published on it Terminate(TopicName) (MessageID, error) // Offload triggers offloading messages in topic to longterm storage Offload(TopicName, MessageID) error // OffloadStatus checks the status of an ongoing offloading operation for a topic OffloadStatus(TopicName) (OffloadProcessStatus, error) // Unload a topic Unload(TopicName) error // Compact triggers compaction to run for a topic. A single topic can only have one instance of compaction // running at any time. Any attempt to trigger another will be met with a ConflictException. Compact(TopicName) error // CompactStatus checks the status of an ongoing compaction for a topic CompactStatus(TopicName) (LongRunningProcessStatus, error) }
Topics is admin interface for topics management
type UpdateOptions ¶
type UpdateOptions struct {
UpdateAuthData bool
}
Options while updating the sink
func NewUpdateOptions ¶
func NewUpdateOptions() *UpdateOptions
type WindowConfig ¶
type WindowConfig struct { WindowLengthCount int WindowLengthDurationMs int64 SlidingIntervalCount int SlidingIntervalDurationMs int64 LateDataTopic string MaxLagMs int64 WatermarkEmitIntervalMs int64 TimestampExtractorClassName string ActualWindowFunctionClassName string }
func NewDefaultWindowConfing ¶
func NewDefaultWindowConfing() *WindowConfig
type WorkerFunctionInstanceStats ¶
type WorkerFunctionInstanceStats struct { Name string `json:"name"` Metrics FunctionInstanceStatsData `json:"metrics"` }
type WorkerInfo ¶
Source Files ¶
- admin.go
- allocator_stats.go
- api_version.go
- auth_action.go
- auth_polices.go
- backlog_quota.go
- broker_ns_isolation_data.go
- broker_stats.go
- brokers.go
- bundles_data.go
- cluster.go
- connector_definition.go
- consumer_config.go
- data.go
- descriptions.go
- dispatch_rate.go
- errors.go
- function_confg.go
- function_state.go
- function_status.go
- functions.go
- functions_stats.go
- functions_worker.go
- internal_configuration_data.go
- load_manage_report.go
- long_running_process_status.go
- message.go
- message_id.go
- metrics.go
- namespace.go
- namespace_name.go
- ns_isolation_data.go
- ns_isolation_policy.go
- ns_ownership_status.go
- persistence_policies.go
- policies.go
- resource_quota.go
- resource_quotas.go
- resources.go
- retention_policies.go
- schema.go
- schema_strategy.go
- schema_util.go
- sinkConfig.go
- sink_status.go
- sinks.go
- source_config.go
- source_status.go
- sources.go
- subscription.go
- subscription_auth_mode.go
- tenant.go
- topic.go
- topic_domain.go
- topic_name.go
- topics_stats_stream.go
- update_options.go
- utils.go
- window_confing.go
- worker_info.go