consumer

package
v0.0.0-...-2416616 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Unused3 int8
	Unused4
)
View Source
const (
	AggChanSizeMultiplier = 8
)

Note: Should be a multiple of number of dcpFeeds which we might not know during initialising consumer Hence, assuming 8 KV dcpFeeds for an average of 8 KV nodes.

View Source
const (

	// ClusterChangeNotifChBufSize limits buffer size for cluster change notif from producer
	ClusterChangeNotifChBufSize = 10
)

Variables

View Source
var (
	StateInit      = state("StreamInit")
	StateStreamEnd = state("StreamEnd")

	// Caller should provide the request that its making and not this value
	StateStreamRequest = state("StreamRequest")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Consumer is responsible interacting with c++ v8 worker over local tcp port

func NewConsumer

func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, rConfig *common.RebalanceConfig,
	index int, uuid, nsServerPort string, eventingNodeUUIDs []string, vbnos []uint16, app *common.AppConfig,
	dcpConfig map[string]interface{}, p common.EventingProducer, s common.EventingSuperSup,
	numVbuckets int, retryCount *int64, vbEventingNodeAssignMap map[uint16]string,
	workerVbucketMap map[string][]uint16, featureMatrix uint32) *Consumer

NewConsumer called by producer to create consumer handle

func (*Consumer) BootstrapStatus

func (c *Consumer) BootstrapStatus() bool

BootstrapStatus returns state of bootstrap for consumer instance

func (*Consumer) CheckIfQueuesAreDrained

func (c *Consumer) CheckIfQueuesAreDrained() error

CheckIfQueuesAreDrained looks at all queues to make sure no events are left, to avoid potential loss of events - especially during rebalance out and pausing of execution of a function

func (*Consumer) ClearEventStats

func (c *Consumer) ClearEventStats()

ClearEventStats flushes event processing stats

func (*Consumer) CloseAllRunningDcpFeeds

func (c *Consumer) CloseAllRunningDcpFeeds()

CloseAllRunningDcpFeeds drops all socket connections to DCP producer

func (*Consumer) ConsumerName

func (c *Consumer) ConsumerName() string

ConsumerName returns consumer name e.q <event_handler_name>_worker_1

func (*Consumer) DcpEventsRemainingToProcess

func (c *Consumer) DcpEventsRemainingToProcess() uint64

DcpEventsRemainingToProcess reports cached value for dcp events remaining to producer

func (*Consumer) EventingNodeUUIDs

func (c *Consumer) EventingNodeUUIDs() []string

EventingNodeUUIDs return list of known eventing node uuids

func (*Consumer) EventsProcessedPSec

func (c *Consumer) EventsProcessedPSec() *cm.EventProcessingStats

EventsProcessedPSec reports dcp + timer events triggered per sec

func (*Consumer) GetAssignedVbs

func (c *Consumer) GetAssignedVbs(workerName string) ([]uint16, error)

func (*Consumer) GetEventProcessingStats

func (c *Consumer) GetEventProcessingStats() map[string]uint64

GetEventProcessingStats exposes dcp/timer processing stats

func (*Consumer) GetExecutionStats

func (c *Consumer) GetExecutionStats() map[string]interface{}

GetExecutionStats returns OnUpdate/OnDelete success/failure stats for event handlers from cpp world

func (*Consumer) GetFailureStats

func (c *Consumer) GetFailureStats() map[string]interface{}

GetFailureStats returns failure stats for event handlers from cpp world

func (*Consumer) GetInsight

func (c *Consumer) GetInsight() *common.Insight

func (*Consumer) GetLcbExceptionsStats

func (c *Consumer) GetLcbExceptionsStats() map[string]uint64

GetLcbExceptionsStats returns libcouchbase exception stats from CPP workers

func (*Consumer) GetMetaStoreStats

func (c *Consumer) GetMetaStoreStats() map[string]uint64

GetMetaStoreStats exposes timer store related stat counters

func (*Consumer) GetOwner

func (c *Consumer) GetOwner() *common.Owner

func (*Consumer) GetPrevRebalanceInCompleteStatus

func (c *Consumer) GetPrevRebalanceInCompleteStatus() bool

GetPrevRebalanceInCompleteStatus returns rebalance status for consumer instance

func (*Consumer) GetRebalanceStatus

func (c *Consumer) GetRebalanceStatus() bool

GetRebalanceStatus returns rebalance status for consumer instance

func (*Consumer) HandleV8Worker

func (c *Consumer) HandleV8Worker() error

HandleV8Worker sets up CPP V8 worker post its bootstrap

func (*Consumer) HostPortAddr

func (c *Consumer) HostPortAddr() string

HostPortAddr returns the HostPortAddr combination of current eventing node e.g. 127.0.0.1:25000

func (*Consumer) Index

func (c *Consumer) Index() int

Index returns the index of consumer among all consumers designated for specific handler on an eventing node

func (*Consumer) InternalVbDistributionStats

func (c *Consumer) InternalVbDistributionStats() []uint16

InternalVbDistributionStats returns internal state of vbucket ownership distribution on local eventing node

func (*Consumer) NodeUUID

func (c *Consumer) NodeUUID() string

NodeUUID returns UUID that's supplied by ns_server from command line

func (*Consumer) NotifyClusterChange

func (c *Consumer) NotifyClusterChange()

NotifyClusterChange is called by producer handle to signify each consumer instance about StartTopologyChange rpc call from cbauth service.Manager

func (*Consumer) NotifyPrepareTopologyChange

func (c *Consumer) NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)

NotifyPrepareTopologyChange is called by producer instance to notify about updated list of node uuids

func (*Consumer) NotifyRebalanceStop

func (c *Consumer) NotifyRebalanceStop()

NotifyRebalanceStop is called by producer to signal stopping of rebalance operation

func (*Consumer) NotifySettingsChange

func (c *Consumer) NotifySettingsChange()

NotifySettingsChange signals consumer instance of settings update

func (*Consumer) NotifyWorker

func (c *Consumer) NotifyWorker()

func (*Consumer) PauseConsumer

func (c *Consumer) PauseConsumer()

This, being the very first consumer level function involved in pause, also holds the responsibility to try and refresh its metadata handle in case of a encryption level change

func (*Consumer) Pid

func (c *Consumer) Pid() int

Pid returns the process id of CPP V8 worker

func (*Consumer) RebalanceStatus

func (c *Consumer) RebalanceStatus() bool

RebalanceStatus returns state of rebalance for consumer instance

func (*Consumer) RebalanceTaskProgress

func (c *Consumer) RebalanceTaskProgress() *cm.RebalanceProgress

RebalanceTaskProgress reports progress to producer

func (*Consumer) RemoveSupervisorToken

func (c *Consumer) RemoveSupervisorToken() error

func (*Consumer) ResetBootstrapDone

func (c *Consumer) ResetBootstrapDone()

ResetBootstrapDone to unset bootstrap flag

func (*Consumer) ResetCounters

func (c *Consumer) ResetCounters()

func (*Consumer) ResolveHostname

func (c *Consumer) ResolveHostname(instance common.DebuggerInstance) string

ResolveHostname returns external IP address of this node. Looks through the alternate addresses as well. Returns alt addr if found. In-case of failure returns 127.0.0.1

func (*Consumer) SendAssignedVbs

func (c *Consumer) SendAssignedVbs()

func (*Consumer) SendDeleteCidMsg

func (c *Consumer) SendDeleteCidMsg(cid uint32, partition uint16, seqNo uint64)

func (*Consumer) SendNoOp

func (c *Consumer) SendNoOp(seqNo uint64, partition uint16)

func (*Consumer) Serve

func (c *Consumer) Serve()

Serve acts as init routine for consumer handle

func (*Consumer) SetBootstrapStatus

func (c *Consumer) SetBootstrapStatus(status bool)

SetBootstrapStatus updates bootstrapping status for consumer instance

func (*Consumer) SetConnHandle

func (c *Consumer) SetConnHandle(conn net.Conn)

SetConnHandle sets the tcp connection handle for CPP V8 worker

func (*Consumer) SetFeatureMatrix

func (c *Consumer) SetFeatureMatrix(featureMatrix uint32)

func (*Consumer) SetFeedbackConnHandle

func (c *Consumer) SetFeedbackConnHandle(conn net.Conn)

SetFeedbackConnHandle initialised the socket connect for data channel from eventing-consumer

func (*Consumer) SetRebalanceStatus

func (c *Consumer) SetRebalanceStatus(status bool)

SetRebalanceStatus update rebalance status for consumer instance

func (*Consumer) SignalBootstrapFinish

func (c *Consumer) SignalBootstrapFinish()

SignalBootstrapFinish is leveraged by Eventing.Producer instance to know if corresponding Eventing.Consumer instance has finished bootstrap

func (*Consumer) SignalConnected

func (c *Consumer) SignalConnected()

SignalConnected notifies consumer routine when CPP V8 worker has connected to tcp listener instance

func (*Consumer) SignalFeedbackConnected

func (c *Consumer) SignalFeedbackConnected()

SignalFeedbackConnected notifies consumer routine when CPP V8 worker has connected to data channel

func (*Consumer) SignalStopDebugger

func (c *Consumer) SignalStopDebugger() error

SignalStopDebugger signal C++ consumer to stop debugger

func (*Consumer) SpawnCompilationWorker

func (c *Consumer) SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*common.CompileStatus, error)

SpawnCompilationWorker bring up a CPP worker to compile the user supplied handler code

func (*Consumer) Stop

func (c *Consumer) Stop(context string)

Stop acts terminate routine for consumer handle

func (*Consumer) String

func (c *Consumer) String() string

func (*Consumer) TimerDebugStats

func (c *Consumer) TimerDebugStats() map[int]map[string]interface{}

TimerDebugStats captures timer related stats to assist in debugging mismatches during rebalance

func (*Consumer) UpdateEncryptionLevel

func (c *Consumer) UpdateEncryptionLevel(enforceTLS, encryptOn bool)

func (*Consumer) UpdateWorkerQueueMemCap

func (c *Consumer) UpdateWorkerQueueMemCap(quota int64)

UpdateWorkerQueueMemCap revises the memory cap for cpp worker, dcp and timer queues

func (*Consumer) VbDcpEventsRemainingToProcess

func (c *Consumer) VbDcpEventsRemainingToProcess() map[int]int64

VbDcpEventsRemainingToProcess reports cached dcp events remaining broken down to vbucket level

func (*Consumer) VbEventingNodeAssignMapUpdate

func (c *Consumer) VbEventingNodeAssignMapUpdate(vbEventingNodeAssignMap map[uint16]string)

VbEventingNodeAssignMapUpdate captures updated node to vbucket assignment

func (*Consumer) VbProcessingStats

func (c *Consumer) VbProcessingStats() map[uint16]map[string]interface{}

VbProcessingStats exposes consumer vb metadata to producer

func (*Consumer) VbSeqnoStats

func (c *Consumer) VbSeqnoStats() map[int]map[string]interface{}

VbSeqnoStats returns seq no stats, which can be useful in figuring out missed events during rebalance VbSeqnoStats returns seq no stats, which can be useful in figuring out missed events during rebalance

func (*Consumer) WorkerVbMapUpdate

func (c *Consumer) WorkerVbMapUpdate(workerVbucketMap map[string][]uint16)

WorkerVbMapUpdate captures updated mapping of active consumers to vbuckets they should handle as per static planner

type DcpStatsLog

type DcpStatsLog interface {
	AddDcpLog(vb uint16, key LogKey, value string)

	DeletePartition(vb uint16)
}

func NewDcpStatsLog

func NewDcpStatsLog(pollingInterval time.Duration, logSuffix string, stopChannel chan struct{}) DcpStatsLog

type DcpVbStats

type DcpVbStats struct {
	CurrentState   state                    `json:"state"`
	Ts             string                   `json:"ts"`
	LastRequest    string                   `json:"last_request"`
	StreamResponse map[string]*ResponseStat `json:"response,omitempty"`
	// contains filtered or unexported fields
}

func NewDcpVbStat

func NewDcpVbStat() *DcpVbStats

func (*DcpVbStats) AddStat

func (stat *DcpVbStats) AddStat(key LogKey, value string)

type LogKey

type LogKey int8
const (
	LogState LogKey = iota
	StreamResponse
)

type OwnershipEntry

type OwnershipEntry struct {
	AssignedWorker string `json:"assigned_worker"`
	CurrentVBOwner string `json:"current_vb_owner"`
	Operation      string `json:"operation"`
	SeqNo          uint64 `json:"seq_no"`
	Timestamp      string `json:"timestamp"`
}

OwnershipEntry captures the state of vbucket within the metadata blob

type ResponseStat

type ResponseStat struct {
	Ts    string `json:"response_ts"`
	Count int    `json:"count"`
}

func NewResponseStat

func NewResponseStat() *ResponseStat

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL