Documentation ¶
Index ¶
- Constants
- Variables
- func CheckAndReturnDefaultForScopeOrCollection(key string) string
- func GetCheckpointKey(app *AppConfig, vb uint16, cType checkpointType) string
- func GetCompositeState(dStatus, pStatus bool) int8
- func GetDefaultHandlerHeaders() []string
- func GetLogfileLocationAndName(parentDir string, locationString string) (string, string)
- func Uint32ToHex(uint32Val uint32) string
- type AppConfig
- type Application
- type Bucket
- type ChangeType
- type CompileStatus
- type Config
- type Constant
- type CouchbaseVer
- type Credential
- type Curl
- type DcpStreamBoundary
- type DebuggerInstance
- type DepCfg
- type EventProcessingStats
- type EventingConsumer
- type EventingProducer
- type EventingServiceMgr
- type EventingSuperSup
- type FunctionScope
- type HandlerConfig
- type Identity
- type Insight
- type InsightLine
- type Insights
- type Key
- type Keyspace
- type KeyspaceID
- type KeyspaceName
- type MonitorType
- type Owner
- type PlannerNodeVbMapping
- type ProcessConfig
- type RebalanceConfig
- type RebalanceProgress
- type SecuritySetting
- type StatsData
- type StorageEngine
- type StreamType
- type TopologyChangeMsg
- type UndeployAction
Constants ¶
View Source
const ( DcpEverything = DcpStreamBoundary("everything") DcpFromNow = DcpStreamBoundary("from_now") DcpFromPrior = DcpStreamBoundary("from_prior") )
View Source
const ( AppLocationTag = "appLocation" AppLocationsTag = "appLocations" ReasonTag = "reason" )
View Source
const ( StartRebalanceCType = ChangeType("start-rebalance") StopRebalanceCType = ChangeType("stop-rebalance") StartFailoverCType = ChangeType("start-failover") )
View Source
const ( AppState int8 = iota AppStateUndeployed AppStateEnabled AppStatePaused AppStateUnexpected )
View Source
const ( WaitingForMutation = "WaitingForMutation" // Debugger has been started and consumers are waiting to trap MutationTrapped = "MutationTrapped" // One of the consumers have trapped the mutation DebuggerTokenKey = "debugger" MetakvEventingPath = "/eventing/" MetakvDebuggerPath = MetakvEventingPath + "debugger/" MetakvTempAppsPath = MetakvEventingPath + "tempApps/" MetakvCredentialsPath = MetakvEventingPath + "credentials/" MetakvConfigPath = MetakvEventingPath + "settings/config" )
View Source
const ( DebuggerCheckpoint checkpointType = iota Checkpoint )
View Source
const (
CurlFeature uint32 = 1 << iota
)
View Source
const (
SystemScopeName = "_system"
)
Variables ¶
View Source
var ( Couchstore = StorageEngine("couchstore") Magma = StorageEngine("magma") )
View Source
var ( ErrRetryTimeout = errors.New("retry timeout") ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap") ErrHandleEmpty = errors.New("Bucket handle not initialized") )
View Source
var BucketNotWatched = errors.New("Bucket not being watched")
View Source
var CouchbaseVerMap = map[string]CouchbaseVer{ "vulcan": CouchbaseVer{ // contains filtered or unexported fields }, "alice": CouchbaseVer{ // contains filtered or unexported fields }, "mad-hatter": CouchbaseVer{ // contains filtered or unexported fields }, "cheshire-cat": CouchbaseVer{ // contains filtered or unexported fields }, "6.6.2": CouchbaseVer{ // contains filtered or unexported fields }, }
View Source
var (
DisableCurl = "disable_curl"
)
View Source
var ErrInvalidVersion = errors.New("invalid eventing version")
View Source
var LanguageCompatibility = []string{"7.2.0", "6.6.2", "6.0.0", "6.5.0"}
missing default is filled by the index 0
View Source
var MetakvMaxRetries int64 = 60
Functions ¶
func GetCheckpointKey ¶
func GetCompositeState ¶
func GetDefaultHandlerHeaders ¶
func GetDefaultHandlerHeaders() []string
func Uint32ToHex ¶
Types ¶
type AppConfig ¶
type AppConfig struct { AppCode string ParsedAppCode string AppDeployState string AppName string AppLocation string AppState string AppVersion string FunctionID uint32 FunctionInstanceID string LastDeploy string Settings map[string]interface{} UserPrefix string FunctionScope FunctionScope }
AppConfig Application/Event handler configuration
type Application ¶
type Application struct { AppHandlers string `json:"appcode"` DeploymentConfig DepCfg `json:"depcfg"` EventingVersion string `json:"version"` EnforceSchema bool `json:"enforce_schema"` FunctionID uint32 `json:"handleruuid"` FunctionInstanceID string `json:"function_instance_id"` Name string `json:"appname"` Settings map[string]interface{} `json:"settings"` Metainfo map[string]interface{} `json:"metainfo,omitempty"` Owner *Owner FunctionScope FunctionScope `json:"function_scope"` }
type ChangeType ¶
type ChangeType string
type CompileStatus ¶
type CouchbaseVer ¶
type CouchbaseVer struct {
// contains filtered or unexported fields
}
func FrameCouchbaseVerFromNsServerStreamingRestApi ¶
func FrameCouchbaseVerFromNsServerStreamingRestApi(ver string) (CouchbaseVer, error)
major.minor.mpVersion-build-type
func FrameCouchbaseVersion ¶
func FrameCouchbaseVersion(ver string) (CouchbaseVer, error)
func FrameCouchbaseVersionShort ¶
func FrameCouchbaseVersionShort(ver string) (CouchbaseVer, error)
for short hand version like x.x.x
func (CouchbaseVer) Compare ¶
func (e CouchbaseVer) Compare(need CouchbaseVer) bool
returns e >= need
func (CouchbaseVer) String ¶
func (e CouchbaseVer) String() string
type Credential ¶
type Curl ¶
type Curl struct { Hostname string `json:"hostname"` Value string `json:"value"` AuthType string `json:"auth_type"` Username string `json:"username"` Password string `json:"password"` BearerKey string `json:"bearer_key"` AllowCookies bool `json:"allow_cookies"` ValidateSSLCertificate bool `json:"validate_ssl_certificate"` }
type DcpStreamBoundary ¶
type DcpStreamBoundary string
func StreamBoundary ¶
func StreamBoundary(boundary string) DcpStreamBoundary
type DebuggerInstance ¶
type DebuggerInstance struct { Token string `json:"token"` // An ID for a debugging session Host string `json:"host"` // The node where debugger has been spawned Status string `json:"status"` // Possible values are WaitingForMutation, MutationTrapped URL string `json:"url"` // Chrome-Devtools URL for debugging NodesExternalIP []string `json:"nodes_external_ip"` // List of external IP address of the nodes in the cluster }
type DepCfg ¶
type DepCfg struct { Buckets []Bucket `json:"buckets,omitempty"` Curl []Curl `json:"curl,omitempty"` Constants []Constant `json:"constants,omitempty"` SourceBucket string `json:"source_bucket"` SourceScope string `json:"source_scope"` SourceCollection string `json:"source_collection"` MetadataBucket string `json:"metadata_bucket"` MetadataScope string `json:"metadata_scope"` MetadataCollection string `json:"metadata_collection"` }
type EventProcessingStats ¶
type EventingConsumer ¶
type EventingConsumer interface { BootstrapStatus() bool CheckIfQueuesAreDrained() error ClearEventStats() CloseAllRunningDcpFeeds() ConsumerName() string DcpEventsRemainingToProcess() uint64 EventingNodeUUIDs() []string EventsProcessedPSec() *EventProcessingStats GetEventProcessingStats() map[string]uint64 GetExecutionStats() map[string]interface{} GetFailureStats() map[string]interface{} GetInsight() *Insight GetLcbExceptionsStats() map[string]uint64 GetMetaStoreStats() map[string]uint64 HandleV8Worker() error HostPortAddr() string Index() int InternalVbDistributionStats() []uint16 NodeUUID() string NotifyClusterChange() NotifyRebalanceStop() NotifySettingsChange() Pid() int RebalanceStatus() bool RebalanceTaskProgress() *RebalanceProgress RemoveSupervisorToken() error ResetBootstrapDone() ResetCounters() Serve() SetConnHandle(net.Conn) SetFeedbackConnHandle(net.Conn) SetRebalanceStatus(status bool) GetRebalanceStatus() bool GetPrevRebalanceInCompleteStatus() bool SignalBootstrapFinish() SignalConnected() SignalFeedbackConnected() SignalStopDebugger() error SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*CompileStatus, error) Stop(context string) String() string TimerDebugStats() map[int]map[string]interface{} NotifyPrepareTopologyChange(keepNodes, ejectNodes []string) UpdateEncryptionLevel(enforceTLS, encryptOn bool) UpdateWorkerQueueMemCap(quota int64) VbDcpEventsRemainingToProcess() map[int]int64 VbEventingNodeAssignMapUpdate(map[uint16]string) VbProcessingStats() map[uint16]map[string]interface{} VbSeqnoStats() map[int]map[string]interface{} WorkerVbMapUpdate(map[string][]uint16) SendAssignedVbs() PauseConsumer() GetAssignedVbs(workerName string) ([]uint16, error) NotifyWorker() GetOwner() *Owner SetFeatureMatrix(featureMatrix uint32) }
EventingConsumer interface to export functions from eventing_consumer
type EventingProducer ¶
type EventingProducer interface { AddMetadataPrefix(key string) Key AppendCurlLatencyStats(deltas StatsData) AppendLatencyStats(deltas StatsData) BootstrapStatus() bool CfgData() string CheckpointBlobDump() map[string]interface{} CleanupMetadataBucket(skipCheckpointBlobs bool) error CleanupUDSs() ClearEventStats() DcpFeedBoundary() string GetAppCode() string GetAppLog(sz int64) []string GetDcpEventsRemainingToProcess() uint64 GetDebuggerURL() (string, error) GetEventingConsumerPids() map[string]int GetEventProcessingStats() map[string]uint64 GetExecutionStats() map[string]interface{} GetFailureStats() map[string]interface{} GetLatencyStats() StatsData GetCurlLatencyStats() StatsData GetInsight() *Insight GetLcbExceptionsStats() map[string]uint64 GetMetaStoreStats() map[string]uint64 GetMetadataPrefix() string GetNsServerPort() string GetVbOwner(vb uint16) (string, string, error) GetSeqsProcessed() map[int]int64 GetDebuggerToken() string InternalVbDistributionStats() map[string]string IsEventingNodeAlive(eventingHostPortAddr, nodeUUID string) bool IsPlannerRunning() bool IsTrapEvent() bool KillAllConsumers() KillAndRespawnEventingConsumer(consumer EventingConsumer) KvHostPorts() []string LenRunningConsumers() int MetadataBucket() string MetadataScope() string MetadataCollection() string NotifyInit() NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType) NotifySettingsChange() NotifySupervisor() NotifyTopologyChange(msg *TopologyChangeMsg) NsServerHostPort() string NsServerNodeCount() int PauseProducer() PlannerStats() []*PlannerNodeVbMapping ResumeProducer() RebalanceStatus() bool RebalanceTaskProgress() *RebalanceProgress RemoveConsumerToken(workerName string) ResetCounters() SignalBootstrapFinish() SignalStartDebugger(token string) error SignalStopDebugger() error SetRetryCount(retryCount int64) SpanBlobDump() map[string]interface{} Serve() SourceBucket() string SourceScope() string SourceCollection() string GetSourceKeyspaceID() (KeyspaceID, bool) GetMetadataKeyspaceID() (KeyspaceID, bool) SrcMutation() bool Stop(context string) StopRunningConsumers() String() string SetTrapEvent(value bool) TimerDebugStats() map[int]map[string]interface{} UndeployHandler(msg UndeployAction) UpdateEncryptionLevel(enforceTLS, encryptOn bool) UpdateMemoryQuota(quota int64) UsingTimer() bool VbDcpEventsRemainingToProcess() map[int]int64 VbDistributionStatsFromMetadata() map[string]map[string]string VbSeqnoStats() map[int][]map[string]interface{} WriteAppLog(log string) WriteDebuggerURL(url string) WriteDebuggerToken(token string, hostnames []string) error GetOwner() *Owner GetFuncScopeDetails() (string, uint32) FunctionManageBucket() string FunctionManageScope() string SetFeatureMatrix(featureMatrix uint32) }
EventingProducer interface to export functions from eventing_producer
type EventingServiceMgr ¶
type EventingServiceMgr interface { UpdateBucketGraphFromMetakv(functionName string) error ResetFailoverStatus() GetFailoverStatus() (failoverNotifTs int64, changeId string) CheckLifeCycleOpsDuringRebalance() bool NotifySupervisorWaitCh() // TODO: Replace it with getting back the whole application. GetFunctionId(id Identity) (uint32, error) }
type EventingSuperSup ¶
type EventingSuperSup interface { PausingAppList() map[string]string BootstrapAppList() map[string]string BootstrapAppStatus(appName string) bool BootstrapStatus() bool CheckAndSwitchgocbBucket(bucketName, appName string, setting *SecuritySetting) error CheckpointBlobDump(appName string) (interface{}, error) ClearEventStats() []string DcpFeedBoundary(fnName string) (string, error) DeployedAppList() []string GetEventProcessingStats(appName string) map[string]uint64 GetAppCode(appName string) string GetAppLog(appName string, sz int64) []string GetAppCompositeState(appName string) int8 GetDcpEventsRemainingToProcess(appName string) uint64 GetDebuggerURL(appName string) (string, error) GetDeployedApps() map[string]string GetEventingConsumerPids(appName string) map[string]int GetExecutionStats(appName string) map[string]interface{} GetFailureStats(appName string) map[string]interface{} GetLatencyStats(appName string) StatsData GetCurlLatencyStats(appName string) StatsData GetInsight(appName string) *Insight GetLcbExceptionsStats(appName string) map[string]uint64 GetLocallyDeployedApps() map[string]string GetMetaStoreStats(appName string) map[string]uint64 GetBucket(bucketName, appName string) (*couchbase.Bucket, error) GetMetadataHandle(bucketName, scopeName, collectionName, appName string) (*gocb.Collection, error) GetKeyspaceID(bucketName, scopeName, collectionName string) (keyspaceID KeyspaceID, err error) GetCurrentManifestId(bucketName string) (string, error) GetRegisteredPool() string GetSeqsProcessed(appName string) map[int]int64 GetNumVbucketsForBucket(bucketName string) int InternalVbDistributionStats(appName string) map[string]string KillAllConsumers() NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType) TopologyChangeNotifCallback(kve metakv.KVEntry) error PlannerStats(appName string) []*PlannerNodeVbMapping RebalanceStatus() bool RebalanceTaskProgress(appName string) (*RebalanceProgress, error) RemoveProducerToken(appName string) RestPort() string ResetCounters(appName string) error SetSecuritySetting(setting *SecuritySetting) bool GetSecuritySetting() *SecuritySetting EncryptionChangedDuringLifecycle() bool GetGocbSubscribedApps(encryptionEnabled bool) map[string]struct{} SignalStopDebugger(appName string) error SpanBlobDump(appName string) (interface{}, error) StopProducer(appName string, msg UndeployAction) TimerDebugStats(appName string) (map[int]map[string]interface{}, error) UpdateEncryptionLevel(enforceTLS, encryptOn bool) VbDcpEventsRemainingToProcess(appName string) map[int]int64 VbDistributionStatsFromMetadata(appName string) map[string]map[string]string VbSeqnoStats(appName string) (map[int][]map[string]interface{}, error) WriteDebuggerURL(appName, url string) WriteDebuggerToken(appName, token string, hostnames []string) IncWorkerRespawnedCount() WorkerRespawnedCount() uint32 CheckLifeCycleOpsDuringRebalance() bool GetBSCSnapshot() (map[string]map[string][]string, error) GetSystemMemoryQuota() float64 WatchBucket(keyspace Keyspace, appName string, mType MonitorType) error UnwatchBucket(keyspace Keyspace, appName string) }
type FunctionScope ¶
needed only during 1st creation of the function
func GetFunctionScope ¶
func GetFunctionScope(identity Identity) FunctionScope
func (FunctionScope) String ¶
func (fs FunctionScope) String() string
func (*FunctionScope) ToKeyspace ¶
func (fs *FunctionScope) ToKeyspace() *Keyspace
type HandlerConfig ¶
type HandlerConfig struct { N1qlPrepareAll bool LanguageCompatibility string AllowTransactionMutations bool AggDCPFeedMemCap int64 CheckpointInterval int IdleCheckpointInterval int CPPWorkerThrCount int ExecuteTimerRoutineCount int ExecutionTimeout int FeedbackBatchSize int FeedbackQueueCap int64 FeedbackReadBufferSize int HandlerHeaders []string LcbInstCapacity int N1qlConsistency string LogLevel string SocketWriteBatchSize int SourceKeyspace *Keyspace StatsLogInterval int StreamBoundary DcpStreamBoundary TimerContextSize int64 TimerQueueMemCap uint64 TimerQueueSize uint64 UndeployRoutineCount int WorkerCount int WorkerQueueCap int64 WorkerQueueMemCap int64 WorkerResponseTimeout int LcbRetryCount int LcbTimeout int BucketCacheSize int64 BucketCacheAge int64 NumTimerPartitions int CurlMaxAllowedRespSize int }
type Insight ¶
type Insight struct { Script string `json:"script"` Lines map[int]InsightLine `json:"lines"` }
func NewInsight ¶
func NewInsight() *Insight
func (*Insight) Accumulate ¶
type InsightLine ¶
type Keyspace ¶
func (Keyspace) IsWildcard ¶
func (Keyspace) ToFunctionScope ¶
func (k Keyspace) ToFunctionScope() *FunctionScope
type KeyspaceID ¶
type KeyspaceID struct { Bid string Cid uint32 Sid uint32 StreamType StreamType }
func (KeyspaceID) Equals ¶
func (keyspaceID KeyspaceID) Equals(keyspaceID2 KeyspaceID) bool
type KeyspaceName ¶
type MonitorType ¶
type MonitorType int8
const ( SrcWatch MonitorType = iota MetaWatch FunctionScopeWatch )
type PlannerNodeVbMapping ¶
type PlannerNodeVbMapping struct { Hostname string `json:"host_name"` StartVb int `json:"start_vb"` VbsCount int `json:"vb_count"` }
PlannerNodeVbMapping captures the vbucket distribution across all eventing nodes as per planner
type ProcessConfig ¶
type RebalanceConfig ¶
type RebalanceProgress ¶
type SecuritySetting ¶
type StorageEngine ¶
type StorageEngine string
type StreamType ¶
type StreamType uint8
const ( STREAM_BUCKET StreamType = iota STREAM_SCOPE STREAM_COLLECTION STREAM_UNKNOWN )
type TopologyChangeMsg ¶
type TopologyChangeMsg struct { CType ChangeType MsgSource string }
type UndeployAction ¶
type UndeployAction struct { UpdateMetakv bool SkipMetadataCleanup bool DeleteFunction bool Reason string }
func DefaultUndeployAction ¶
func DefaultUndeployAction() UndeployAction
func (UndeployAction) String ¶
func (msg UndeployAction) String() string
Click to show internal directories.
Click to hide internal directories.