common

package
v0.0.0-...-2416616 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DcpEverything = DcpStreamBoundary("everything")
	DcpFromNow    = DcpStreamBoundary("from_now")
	DcpFromPrior  = DcpStreamBoundary("from_prior")
)
View Source
const (
	AppLocationTag  = "appLocation"
	AppLocationsTag = "appLocations"
	ReasonTag       = "reason"
)
View Source
const (
	StartRebalanceCType = ChangeType("start-rebalance")
	StopRebalanceCType  = ChangeType("stop-rebalance")
	StartFailoverCType  = ChangeType("start-failover")
)
View Source
const (
	AppState int8 = iota
	AppStateUndeployed
	AppStateEnabled
	AppStatePaused
	AppStateUnexpected
)
View Source
const (
	WaitingForMutation    = "WaitingForMutation" // Debugger has been started and consumers are waiting to trap
	MutationTrapped       = "MutationTrapped"    // One of the consumers have trapped the mutation
	DebuggerTokenKey      = "debugger"
	MetakvEventingPath    = "/eventing/"
	MetakvDebuggerPath    = MetakvEventingPath + "debugger/"
	MetakvTempAppsPath    = MetakvEventingPath + "tempApps/"
	MetakvCredentialsPath = MetakvEventingPath + "credentials/"
	MetakvConfigPath      = MetakvEventingPath + "settings/config"
)
View Source
const (
	DebuggerCheckpoint checkpointType = iota
	Checkpoint
)
View Source
const (
	CurlFeature uint32 = 1 << iota
)
View Source
const (
	SystemScopeName = "_system"
)

Variables

View Source
var (
	Couchstore = StorageEngine("couchstore")
	Magma      = StorageEngine("magma")
)
View Source
var (
	ErrRetryTimeout           = errors.New("retry timeout")
	ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap")
	ErrHandleEmpty            = errors.New("Bucket handle not initialized")
)
View Source
var BucketNotWatched = errors.New("Bucket not being watched")
View Source
var CouchbaseVerMap = map[string]CouchbaseVer{
	"vulcan": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"alice": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"mad-hatter": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"cheshire-cat": CouchbaseVer{
		// contains filtered or unexported fields
	},
	"6.6.2": CouchbaseVer{
		// contains filtered or unexported fields
	},
}
View Source
var (
	DisableCurl = "disable_curl"
)
View Source
var ErrInvalidVersion = errors.New("invalid eventing version")
View Source
var LanguageCompatibility = []string{"7.2.0", "6.6.2", "6.0.0", "6.5.0"}

missing default is filled by the index 0

View Source
var MetakvMaxRetries int64 = 60

Functions

func CheckAndReturnDefaultForScopeOrCollection

func CheckAndReturnDefaultForScopeOrCollection(key string) string

func GetCheckpointKey

func GetCheckpointKey(app *AppConfig, vb uint16, cType checkpointType) string

func GetCompositeState

func GetCompositeState(dStatus, pStatus bool) int8

func GetDefaultHandlerHeaders

func GetDefaultHandlerHeaders() []string

func GetLogfileLocationAndName

func GetLogfileLocationAndName(parentDir string, locationString string) (string, string)

func Uint32ToHex

func Uint32ToHex(uint32Val uint32) string

Types

type AppConfig

type AppConfig struct {
	AppCode            string
	ParsedAppCode      string
	AppDeployState     string
	AppName            string
	AppLocation        string
	AppState           string
	AppVersion         string
	FunctionID         uint32
	FunctionInstanceID string
	LastDeploy         string
	Settings           map[string]interface{}
	UserPrefix         string
	FunctionScope      FunctionScope
}

AppConfig Application/Event handler configuration

type Application

type Application struct {
	AppHandlers        string                 `json:"appcode"`
	DeploymentConfig   DepCfg                 `json:"depcfg"`
	EventingVersion    string                 `json:"version"`
	EnforceSchema      bool                   `json:"enforce_schema"`
	FunctionID         uint32                 `json:"handleruuid"`
	FunctionInstanceID string                 `json:"function_instance_id"`
	Name               string                 `json:"appname"`
	Settings           map[string]interface{} `json:"settings"`
	Metainfo           map[string]interface{} `json:"metainfo,omitempty"`
	Owner              *Owner
	FunctionScope      FunctionScope `json:"function_scope"`
}

type Bucket

type Bucket struct {
	Alias          string `json:"alias"`
	BucketName     string `json:"bucket_name"`
	ScopeName      string `json:"scope_name"`
	CollectionName string `json:"collection_name"`
	Access         string `json:"access"`
}

type ChangeType

type ChangeType string

type CompileStatus

type CompileStatus struct {
	Area           string `json:"area"`
	Column         int    `json:"column_number"`
	CompileSuccess bool   `json:"compile_success"`
	Description    string `json:"description"`
	Index          int    `json:"index"`
	Language       string `json:"language"`
	Line           int    `json:"line_number"`
}

type Config

type Config map[string]interface{}

type Constant

type Constant struct {
	Value   string `json:"value"`
	Literal string `json:"literal"`
}

type CouchbaseVer

type CouchbaseVer struct {
	// contains filtered or unexported fields
}

func FrameCouchbaseVerFromNsServerStreamingRestApi

func FrameCouchbaseVerFromNsServerStreamingRestApi(ver string) (CouchbaseVer, error)

major.minor.mpVersion-build-type

func FrameCouchbaseVersion

func FrameCouchbaseVersion(ver string) (CouchbaseVer, error)

func FrameCouchbaseVersionShort

func FrameCouchbaseVersionShort(ver string) (CouchbaseVer, error)

for short hand version like x.x.x

func (CouchbaseVer) Compare

func (e CouchbaseVer) Compare(need CouchbaseVer) bool

returns e >= need

func (CouchbaseVer) Equals

func (e CouchbaseVer) Equals(need CouchbaseVer) bool

returns e == need

func (CouchbaseVer) String

func (e CouchbaseVer) String() string

type Credential

type Credential struct {
	Username  string `json:"username"`
	Password  string `json:"password"`
	BearerKey string `json:"bearer_key"`
}

type Curl

type Curl struct {
	Hostname               string `json:"hostname"`
	Value                  string `json:"value"`
	AuthType               string `json:"auth_type"`
	Username               string `json:"username"`
	Password               string `json:"password"`
	BearerKey              string `json:"bearer_key"`
	AllowCookies           bool   `json:"allow_cookies"`
	ValidateSSLCertificate bool   `json:"validate_ssl_certificate"`
}

type DcpStreamBoundary

type DcpStreamBoundary string

func StreamBoundary

func StreamBoundary(boundary string) DcpStreamBoundary

type DebuggerInstance

type DebuggerInstance struct {
	Token           string   `json:"token"`             // An ID for a debugging session
	Host            string   `json:"host"`              // The node where debugger has been spawned
	Status          string   `json:"status"`            // Possible values are WaitingForMutation, MutationTrapped
	URL             string   `json:"url"`               // Chrome-Devtools URL for debugging
	NodesExternalIP []string `json:"nodes_external_ip"` // List of external IP address of the nodes in the cluster
}

type DepCfg

type DepCfg struct {
	Buckets            []Bucket   `json:"buckets,omitempty"`
	Curl               []Curl     `json:"curl,omitempty"`
	Constants          []Constant `json:"constants,omitempty"`
	SourceBucket       string     `json:"source_bucket"`
	SourceScope        string     `json:"source_scope"`
	SourceCollection   string     `json:"source_collection"`
	MetadataBucket     string     `json:"metadata_bucket"`
	MetadataScope      string     `json:"metadata_scope"`
	MetadataCollection string     `json:"metadata_collection"`
}

type EventProcessingStats

type EventProcessingStats struct {
	DcpEventsProcessedPSec   int    `json:"dcp_events_processed_psec"`
	TimerEventsProcessedPSec int    `json:"timer_events_processed_psec"`
	Timestamp                string `json:"timestamp"`
}

type EventingConsumer

type EventingConsumer interface {
	BootstrapStatus() bool
	CheckIfQueuesAreDrained() error
	ClearEventStats()
	CloseAllRunningDcpFeeds()
	ConsumerName() string
	DcpEventsRemainingToProcess() uint64
	EventingNodeUUIDs() []string
	EventsProcessedPSec() *EventProcessingStats
	GetEventProcessingStats() map[string]uint64
	GetExecutionStats() map[string]interface{}
	GetFailureStats() map[string]interface{}
	GetInsight() *Insight
	GetLcbExceptionsStats() map[string]uint64
	GetMetaStoreStats() map[string]uint64
	HandleV8Worker() error
	HostPortAddr() string
	Index() int
	InternalVbDistributionStats() []uint16
	NodeUUID() string
	NotifyClusterChange()
	NotifyRebalanceStop()
	NotifySettingsChange()
	Pid() int
	RebalanceStatus() bool
	RebalanceTaskProgress() *RebalanceProgress
	RemoveSupervisorToken() error
	ResetBootstrapDone()
	ResetCounters()
	Serve()
	SetConnHandle(net.Conn)
	SetFeedbackConnHandle(net.Conn)
	SetRebalanceStatus(status bool)
	GetRebalanceStatus() bool
	GetPrevRebalanceInCompleteStatus() bool
	SignalBootstrapFinish()
	SignalConnected()
	SignalFeedbackConnected()
	SignalStopDebugger() error
	SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*CompileStatus, error)
	Stop(context string)
	String() string
	TimerDebugStats() map[int]map[string]interface{}
	NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	UpdateWorkerQueueMemCap(quota int64)
	VbDcpEventsRemainingToProcess() map[int]int64
	VbEventingNodeAssignMapUpdate(map[uint16]string)
	VbProcessingStats() map[uint16]map[string]interface{}
	VbSeqnoStats() map[int]map[string]interface{}
	WorkerVbMapUpdate(map[string][]uint16)

	SendAssignedVbs()
	PauseConsumer()
	GetAssignedVbs(workerName string) ([]uint16, error)
	NotifyWorker()
	GetOwner() *Owner

	SetFeatureMatrix(featureMatrix uint32)
}

EventingConsumer interface to export functions from eventing_consumer

type EventingProducer

type EventingProducer interface {
	AddMetadataPrefix(key string) Key
	AppendCurlLatencyStats(deltas StatsData)
	AppendLatencyStats(deltas StatsData)
	BootstrapStatus() bool
	CfgData() string
	CheckpointBlobDump() map[string]interface{}
	CleanupMetadataBucket(skipCheckpointBlobs bool) error
	CleanupUDSs()
	ClearEventStats()
	DcpFeedBoundary() string
	GetAppCode() string
	GetAppLog(sz int64) []string
	GetDcpEventsRemainingToProcess() uint64
	GetDebuggerURL() (string, error)
	GetEventingConsumerPids() map[string]int
	GetEventProcessingStats() map[string]uint64
	GetExecutionStats() map[string]interface{}
	GetFailureStats() map[string]interface{}
	GetLatencyStats() StatsData
	GetCurlLatencyStats() StatsData
	GetInsight() *Insight
	GetLcbExceptionsStats() map[string]uint64
	GetMetaStoreStats() map[string]uint64
	GetMetadataPrefix() string
	GetNsServerPort() string
	GetVbOwner(vb uint16) (string, string, error)
	GetSeqsProcessed() map[int]int64
	GetDebuggerToken() string
	InternalVbDistributionStats() map[string]string
	IsEventingNodeAlive(eventingHostPortAddr, nodeUUID string) bool
	IsPlannerRunning() bool
	IsTrapEvent() bool
	KillAllConsumers()
	KillAndRespawnEventingConsumer(consumer EventingConsumer)
	KvHostPorts() []string
	LenRunningConsumers() int
	MetadataBucket() string
	MetadataScope() string
	MetadataCollection() string
	NotifyInit()
	NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
	NotifySettingsChange()
	NotifySupervisor()
	NotifyTopologyChange(msg *TopologyChangeMsg)
	NsServerHostPort() string
	NsServerNodeCount() int
	PauseProducer()
	PlannerStats() []*PlannerNodeVbMapping
	ResumeProducer()
	RebalanceStatus() bool
	RebalanceTaskProgress() *RebalanceProgress
	RemoveConsumerToken(workerName string)
	ResetCounters()
	SignalBootstrapFinish()
	SignalStartDebugger(token string) error
	SignalStopDebugger() error
	SetRetryCount(retryCount int64)
	SpanBlobDump() map[string]interface{}
	Serve()
	SourceBucket() string
	SourceScope() string
	SourceCollection() string
	GetSourceKeyspaceID() (KeyspaceID, bool)
	GetMetadataKeyspaceID() (KeyspaceID, bool)
	SrcMutation() bool
	Stop(context string)
	StopRunningConsumers()
	String() string
	SetTrapEvent(value bool)
	TimerDebugStats() map[int]map[string]interface{}
	UndeployHandler(msg UndeployAction)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	UpdateMemoryQuota(quota int64)
	UsingTimer() bool
	VbDcpEventsRemainingToProcess() map[int]int64
	VbDistributionStatsFromMetadata() map[string]map[string]string
	VbSeqnoStats() map[int][]map[string]interface{}
	WriteAppLog(log string)
	WriteDebuggerURL(url string)
	WriteDebuggerToken(token string, hostnames []string) error
	GetOwner() *Owner
	GetFuncScopeDetails() (string, uint32)
	FunctionManageBucket() string
	FunctionManageScope() string
	SetFeatureMatrix(featureMatrix uint32)
}

EventingProducer interface to export functions from eventing_producer

type EventingServiceMgr

type EventingServiceMgr interface {
	UpdateBucketGraphFromMetakv(functionName string) error
	ResetFailoverStatus()
	GetFailoverStatus() (failoverNotifTs int64, changeId string)
	CheckLifeCycleOpsDuringRebalance() bool
	NotifySupervisorWaitCh()
	// TODO: Replace it with getting back the whole application.
	GetFunctionId(id Identity) (uint32, error)
}

type EventingSuperSup

type EventingSuperSup interface {
	PausingAppList() map[string]string
	BootstrapAppList() map[string]string
	BootstrapAppStatus(appName string) bool
	BootstrapStatus() bool
	CheckAndSwitchgocbBucket(bucketName, appName string, setting *SecuritySetting) error
	CheckpointBlobDump(appName string) (interface{}, error)
	ClearEventStats() []string
	DcpFeedBoundary(fnName string) (string, error)
	DeployedAppList() []string
	GetEventProcessingStats(appName string) map[string]uint64
	GetAppCode(appName string) string
	GetAppLog(appName string, sz int64) []string
	GetAppCompositeState(appName string) int8
	GetDcpEventsRemainingToProcess(appName string) uint64
	GetDebuggerURL(appName string) (string, error)
	GetDeployedApps() map[string]string
	GetEventingConsumerPids(appName string) map[string]int
	GetExecutionStats(appName string) map[string]interface{}
	GetFailureStats(appName string) map[string]interface{}
	GetLatencyStats(appName string) StatsData
	GetCurlLatencyStats(appName string) StatsData
	GetInsight(appName string) *Insight
	GetLcbExceptionsStats(appName string) map[string]uint64
	GetLocallyDeployedApps() map[string]string
	GetMetaStoreStats(appName string) map[string]uint64
	GetBucket(bucketName, appName string) (*couchbase.Bucket, error)
	GetMetadataHandle(bucketName, scopeName, collectionName, appName string) (*gocb.Collection, error)
	GetKeyspaceID(bucketName, scopeName, collectionName string) (keyspaceID KeyspaceID, err error)
	GetCurrentManifestId(bucketName string) (string, error)
	GetRegisteredPool() string
	GetSeqsProcessed(appName string) map[int]int64
	GetNumVbucketsForBucket(bucketName string) int
	InternalVbDistributionStats(appName string) map[string]string
	KillAllConsumers()
	NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
	TopologyChangeNotifCallback(kve metakv.KVEntry) error
	PlannerStats(appName string) []*PlannerNodeVbMapping
	RebalanceStatus() bool
	RebalanceTaskProgress(appName string) (*RebalanceProgress, error)
	RemoveProducerToken(appName string)
	RestPort() string
	ResetCounters(appName string) error
	SetSecuritySetting(setting *SecuritySetting) bool
	GetSecuritySetting() *SecuritySetting
	EncryptionChangedDuringLifecycle() bool
	GetGocbSubscribedApps(encryptionEnabled bool) map[string]struct{}
	SignalStopDebugger(appName string) error
	SpanBlobDump(appName string) (interface{}, error)
	StopProducer(appName string, msg UndeployAction)
	TimerDebugStats(appName string) (map[int]map[string]interface{}, error)
	UpdateEncryptionLevel(enforceTLS, encryptOn bool)
	VbDcpEventsRemainingToProcess(appName string) map[int]int64
	VbDistributionStatsFromMetadata(appName string) map[string]map[string]string
	VbSeqnoStats(appName string) (map[int][]map[string]interface{}, error)
	WriteDebuggerURL(appName, url string)
	WriteDebuggerToken(appName, token string, hostnames []string)
	IncWorkerRespawnedCount()
	WorkerRespawnedCount() uint32
	CheckLifeCycleOpsDuringRebalance() bool
	GetBSCSnapshot() (map[string]map[string][]string, error)
	GetSystemMemoryQuota() float64

	WatchBucket(keyspace Keyspace, appName string, mType MonitorType) error
	UnwatchBucket(keyspace Keyspace, appName string)
}

type FunctionScope

type FunctionScope struct {
	BucketName string `json:"bucket"`
	ScopeName  string `json:"scope"`
}

needed only during 1st creation of the function

func GetFunctionScope

func GetFunctionScope(identity Identity) FunctionScope

func (FunctionScope) String

func (fs FunctionScope) String() string

func (*FunctionScope) ToKeyspace

func (fs *FunctionScope) ToKeyspace() *Keyspace

type HandlerConfig

type HandlerConfig struct {
	N1qlPrepareAll            bool
	LanguageCompatibility     string
	AllowTransactionMutations bool
	AggDCPFeedMemCap          int64
	CheckpointInterval        int
	IdleCheckpointInterval    int
	CPPWorkerThrCount         int
	ExecuteTimerRoutineCount  int
	ExecutionTimeout          int
	FeedbackBatchSize         int
	FeedbackQueueCap          int64
	FeedbackReadBufferSize    int
	HandlerHeaders            []string
	HandlerFooters            []string
	LcbInstCapacity           int
	N1qlConsistency           string
	LogLevel                  string
	SocketWriteBatchSize      int
	SourceKeyspace            *Keyspace
	StatsLogInterval          int
	StreamBoundary            DcpStreamBoundary
	TimerContextSize          int64
	TimerQueueMemCap          uint64
	TimerQueueSize            uint64
	UndeployRoutineCount      int
	WorkerCount               int
	WorkerQueueCap            int64
	WorkerQueueMemCap         int64
	WorkerResponseTimeout     int
	LcbRetryCount             int
	LcbTimeout                int
	BucketCacheSize           int64
	BucketCacheAge            int64
	NumTimerPartitions        int
	CurlMaxAllowedRespSize    int
}

type Identity

type Identity struct {
	AppName string
	Bucket  string
	Scope   string
}

func GetIdentityFromLocation

func GetIdentityFromLocation(locationString string) (Identity, error)

func (Identity) String

func (id Identity) String() string

func (Identity) ToLocation

func (id Identity) ToLocation() string

type Insight

type Insight struct {
	Script string              `json:"script"`
	Lines  map[int]InsightLine `json:"lines"`
}

func NewInsight

func NewInsight() *Insight

func (*Insight) Accumulate

func (dst *Insight) Accumulate(src *Insight)

type InsightLine

type InsightLine struct {
	CallCount      int64   `json:"call_count"`
	CallTime       float64 `json:"call_time"`
	ExceptionCount int64   `json:"error_count"`
	LastException  string  `json:"error_msg"`
	LastLog        string  `json:"last_log"`
}

type Insights

type Insights map[string]*Insight

func NewInsights

func NewInsights() *Insights

func (*Insights) Accumulate

func (dst *Insights) Accumulate(src *Insights)

type Key

type Key struct {
	// contains filtered or unexported fields
}

func NewKey

func NewKey(userPrefix, clusterPrefix, key string) Key

func (Key) GetPrefix

func (k Key) GetPrefix() string

func (Key) Raw

func (k Key) Raw() string

type Keyspace

type Keyspace struct {
	BucketName     string
	ScopeName      string
	CollectionName string
}

func (Keyspace) Equals

func (k1 Keyspace) Equals(k2 Keyspace) bool

func (Keyspace) IsWildcard

func (k Keyspace) IsWildcard() bool

func (Keyspace) String

func (k Keyspace) String() string

func (Keyspace) ToFunctionScope

func (k Keyspace) ToFunctionScope() *FunctionScope

type KeyspaceID

type KeyspaceID struct {
	Bid        string
	Cid        uint32
	Sid        uint32
	StreamType StreamType
}

func (KeyspaceID) Equals

func (keyspaceID KeyspaceID) Equals(keyspaceID2 KeyspaceID) bool

type KeyspaceName

type KeyspaceName struct {
	Bucket     string `json:"bucket_name"`
	Scope      string `json:"scope_name"`
	Collection string `json:"collection_name"`
}

type MonitorType

type MonitorType int8
const (
	SrcWatch MonitorType = iota
	MetaWatch
	FunctionScopeWatch
)

type Owner

type Owner struct {
	UUID   string
	User   string
	Domain string
}

func (*Owner) String

func (o *Owner) String() string

type PlannerNodeVbMapping

type PlannerNodeVbMapping struct {
	Hostname string `json:"host_name"`
	StartVb  int    `json:"start_vb"`
	VbsCount int    `json:"vb_count"`
}

PlannerNodeVbMapping captures the vbucket distribution across all eventing nodes as per planner

type ProcessConfig

type ProcessConfig struct {
	BreakpadOn             bool
	DebuggerPort           string
	DiagDir                string
	EventingDir            string
	EventingPort           string
	EventingSSLPort        string
	FeedbackSockIdentifier string
	IPCType                string
	SockIdentifier         string
}

type RebalanceConfig

type RebalanceConfig struct {
	VBOwnershipGiveUpRoutineCount   int
	VBOwnershipTakeoverRoutineCount int
}

type RebalanceProgress

type RebalanceProgress struct {
	CloseStreamVbsLen     int
	StreamReqVbsLen       int
	VbsRemainingToShuffle int
	VbsOwnedPerPlan       int
	NodeLevelStats        interface{}
}

type SecuritySetting

type SecuritySetting struct {
	EncryptData        bool
	DisableNonSSLPorts bool
	CAFile             string
	CertFile           string
	KeyFile            string
	RootCAs            *x509.CertPool
}

type StatsData

type StatsData map[string]uint64

type StorageEngine

type StorageEngine string

type StreamType

type StreamType uint8
const (
	STREAM_BUCKET StreamType = iota
	STREAM_SCOPE
	STREAM_COLLECTION
	STREAM_UNKNOWN
)

type TopologyChangeMsg

type TopologyChangeMsg struct {
	CType     ChangeType
	MsgSource string
}

type UndeployAction

type UndeployAction struct {
	UpdateMetakv        bool
	SkipMetadataCleanup bool
	DeleteFunction      bool

	Reason string
}

func DefaultUndeployAction

func DefaultUndeployAction() UndeployAction

func (UndeployAction) String

func (msg UndeployAction) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL