indexer

package
v0.0.0-...-d8c7374 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2017 License: Apache-2.0 Imports: 61 Imported by: 0

Documentation

Overview

@author Couchbase <info@couchbase.com> @copyright 2015 Couchbase, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

@copyright 2016 Couchbase, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	VBS_INIT = iota
	VBS_STREAM_BEGIN
	VBS_STREAM_END
	VBS_CONN_ERROR
	VBS_REPAIR
)
View Source
const (
	ERROR_PANIC errCode = iota

	//Slab Manager
	ERROR_SLAB_INIT
	ERROR_SLAB_BAD_ALLOC_REQUEST
	ERROR_SLAB_INTERNAL_ALLOC_ERROR
	ERROR_SLAB_MEM_LIMIT_EXCEED
	ERROR_SLAB_INTERNAL_ERROR

	//Stream Reader
	ERROR_STREAM_INIT
	ERROR_STREAM_READER_UNKNOWN_COMMAND
	ERROR_STREAM_READER_UNKNOWN_ERROR
	ERROR_STREAM_READER_PANIC
	ERROR_STREAM_READER_STREAM_SHUTDOWN

	//Mutation Manager
	ERROR_MUT_MGR_INTERNAL_ERROR
	ERROR_MUT_MGR_STREAM_ALREADY_OPEN
	ERROR_MUT_MGR_STREAM_ALREADY_CLOSED
	ERROR_MUT_MGR_UNKNOWN_COMMAND
	ERROR_MUT_MGR_UNCLEAN_SHUTDOWN
	ERROR_MUT_MGR_PANIC

	//Mutation Queue
	ERROR_MUTATION_QUEUE_INIT

	//Timekeeper
	ERROR_TK_UNKNOWN_STREAM

	//KVSender
	ERROR_KVSENDER_UNKNOWN_INDEX
	ERROR_KVSENDER_STREAM_ALREADY_OPEN
	ERROR_KVSENDER_STREAM_REQUEST_ERROR
	ERROR_KV_SENDER_UNKNOWN_STREAM
	ERROR_KV_SENDER_UNKNOWN_BUCKET
	ERROR_KVSENDER_STREAM_ALREADY_CLOSED

	//ScanCoordinator
	ERROR_SCAN_COORD_UNKNOWN_COMMAND
	ERROR_SCAN_COORD_INTERNAL_ERROR

	//INDEXER
	ERROR_INDEX_ALREADY_EXISTS
	ERROR_INDEXER_INTERNAL_ERROR
	ERROR_INDEX_BUILD_IN_PROGRESS
	ERROR_INDEX_DROP_IN_PROGRESS
	ERROR_INDEXER_UNKNOWN_INDEX
	ERROR_INDEXER_UNKNOWN_BUCKET
	ERROR_INDEXER_IN_RECOVERY
	ERROR_INDEXER_NOT_ACTIVE
	ERROR_INDEXER_REBALANCE_IN_PROGRESS

	//STORAGE_MGR
	ERROR_STORAGE_MGR_ROLLBACK_FAIL

	//CLUSTER_MGR_AGENT
	ERROR_CLUSTER_MGR_AGENT_INIT
	ERROR_CLUSTER_MGR_CREATE_FAIL
	ERROR_CLUSTER_MGR_DROP_FAIL

	ERROR_INDEX_MANAGER_PANIC
	ERROR_INDEX_MANAGER_CHANNEL_CLOSE

	ERROR_SCAN_COORD_QUERYPORT_FAIL
	ERROR_BUCKET_EPHEMERAL
)
View Source
const (
	FATAL errSeverity = iota
	NORMAL
)
View Source
const (
	MESSAGING errCategory = iota
	STORAGE
	MUTATION_QUEUE
	TOPOLOGY
	STREAM_READER
	SLAB_MANAGER
	MUTATION_MANAGER
	TIMEKEEPER
	SCAN_COORD
	INDEXER
	STORAGE_MGR
	CLUSTER_MGR
)
View Source
const (
	Unsorted SortOrder = "none"
	Asc                = "asc"
	Desc               = "desc"
)
View Source
const (
	HTTP_PREFIX             string = "http://"
	MAX_KV_REQUEST_RETRY    int    = 0
	BACKOFF_FACTOR          int    = 2
	MAX_CLUSTER_FETCH_RETRY int    = 600
)
View Source
const (

	//General Messages
	MSG_SUCCESS = iota
	MSG_ERROR
	MSG_TIMESTAMP

	//STREAM_READER
	STREAM_READER_STREAM_DROP_DATA
	STREAM_READER_STREAM_BEGIN
	STREAM_READER_STREAM_END
	STREAM_READER_SYNC
	STREAM_READER_SNAPSHOT_MARKER
	STREAM_READER_UPDATE_QUEUE_MAP
	STREAM_READER_ERROR
	STREAM_READER_SHUTDOWN
	STREAM_READER_CONN_ERROR
	STREAM_READER_HWT

	//MUTATION_MANAGER
	MUT_MGR_PERSIST_MUTATION_QUEUE
	MUT_MGR_ABORT_PERSIST
	MUT_MGR_DRAIN_MUTATION_QUEUE
	MUT_MGR_GET_MUTATION_QUEUE_HWT
	MUT_MGR_GET_MUTATION_QUEUE_LWT
	MUT_MGR_SHUTDOWN
	MUT_MGR_FLUSH_DONE
	MUT_MGR_ABORT_DONE

	//TIMEKEEPER
	TK_SHUTDOWN
	TK_STABILITY_TIMESTAMP
	TK_INIT_BUILD_DONE
	TK_INIT_BUILD_DONE_ACK
	TK_ENABLE_FLUSH
	TK_DISABLE_FLUSH
	TK_MERGE_STREAM
	TK_MERGE_STREAM_ACK
	TK_GET_BUCKET_HWT

	//STORAGE_MANAGER
	STORAGE_MGR_SHUTDOWN
	STORAGE_INDEX_SNAP_REQUEST
	STORAGE_INDEX_STORAGE_STATS
	STORAGE_INDEX_COMPACT
	STORAGE_SNAP_DONE

	//KVSender
	KV_SENDER_SHUTDOWN
	KV_SENDER_GET_CURR_KV_TS
	KV_SENDER_RESTART_VBUCKETS
	KV_SENDER_REPAIR_ENDPOINTS
	KV_STREAM_REPAIR
	MSG_SUCCESS_OPEN_STREAM

	//ADMIN_MGR
	ADMIN_MGR_SHUTDOWN

	//CLUSTER_MGR
	CLUST_MGR_AGENT_SHUTDOWN
	CLUST_MGR_CREATE_INDEX_DDL
	CLUST_MGR_BUILD_INDEX_DDL
	CLUST_MGR_BUILD_INDEX_DDL_RESPONSE
	CLUST_MGR_DROP_INDEX_DDL
	CLUST_MGR_UPDATE_TOPOLOGY_FOR_INDEX
	CLUST_MGR_RESET_INDEX
	CLUST_MGR_GET_GLOBAL_TOPOLOGY
	CLUST_MGR_GET_LOCAL
	CLUST_MGR_SET_LOCAL
	CLUST_MGR_DEL_LOCAL
	CLUST_MGR_DEL_BUCKET
	CLUST_MGR_INDEXER_READY
	CLUST_MGR_CLEANUP_INDEX

	//CBQ_BRIDGE_SHUTDOWN
	CBQ_BRIDGE_SHUTDOWN
	CBQ_CREATE_INDEX_DDL
	CBQ_DROP_INDEX_DDL

	//INDEXER
	INDEXER_INIT_PREP_RECOVERY
	INDEXER_PREPARE_RECOVERY
	INDEXER_PREPARE_DONE
	INDEXER_INITIATE_RECOVERY
	INDEXER_RECOVERY_DONE
	INDEXER_BUCKET_NOT_FOUND
	INDEXER_ROLLBACK
	STREAM_REQUEST_DONE
	INDEXER_PAUSE
	INDEXER_RESUME
	INDEXER_PREPARE_UNPAUSE
	INDEXER_UNPAUSE
	INDEXER_BOOTSTRAP
	INDEXER_SET_LOCAL_META
	INDEXER_GET_LOCAL_META
	INDEXER_DEL_LOCAL_META
	INDEXER_CHECK_DDL_IN_PROGRESS
	INDEXER_UPDATE_RSTATE

	//SCAN COORDINATOR
	SCAN_COORD_SHUTDOWN

	COMPACTION_MGR_SHUTDOWN

	//COMMON
	UPDATE_INDEX_INSTANCE_MAP
	UPDATE_INDEX_PARTITION_MAP

	OPEN_STREAM
	ADD_INDEX_LIST_TO_STREAM
	REMOVE_INDEX_LIST_FROM_STREAM
	REMOVE_BUCKET_FROM_STREAM
	CLOSE_STREAM
	CLEANUP_STREAM

	CONFIG_SETTINGS_UPDATE

	STORAGE_STATS
	SCAN_STATS
	INDEX_PROGRESS_STATS
	INDEXER_STATS
	INDEX_STATS_DONE

	STATS_RESET
	REPAIR_ABORT
)
View Source
const (
	StatsReq          ScanReqType = "stats"
	CountReq                      = "count"
	ScanReq                       = "scan"
	ScanAllReq                    = "scanAll"
	HeloReq                       = "helo"
	MultiScanCountReq             = "multiscancount"
)
View Source
const (
	AllReq         ScanFilterType = "scanAll"
	LookupReq                     = "lookup"
	RangeReq                      = "range"       // Range with no filtering
	FilterRangeReq                = "filterRange" // Range with filtering
)

RangeReq is a span which is Range on the entire index without composite index filtering FilterRangeReq is a span request which needs composite index filtering

View Source
const CATCHUP_TOPIC = "CATCHUP_STREAM_TOPIC"

Catchup Topic Name

View Source
const DEFAULT_CLUSTER_ENDPOINT = "127.0.0.1:9000"

Default cluster address

View Source
const DEFAULT_GROWTH_FACTOR float64 = 2.0
View Source
const DEFAULT_MAX_ARRAY_KEY_SIZE = 10240
View Source
const DEFAULT_MAX_SEC_KEY_LEN = 4608

Constants for unlimiting entry size

View Source
const DEFAULT_MAX_SLAB_MEMORY = DEFAULT_SLAB_SIZE * 1024
View Source
const DEFAULT_POOL = "default"

Default Pool Name

View Source
const DEFAULT_RELEASE_BUFFER int = 10000
View Source
const DEFAULT_SLAB_SIZE = DEFAULT_START_CHUNK_SIZE * 1024
View Source
const DEFAULT_START_CHUNK_SIZE = 256

Slab Manager Specific constants

View Source
const ENCODE_BUF_SAFE_PAD = 512
View Source
const INDEXER_ID_KEY = "IndexerId"
View Source
const INDEXER_NODE_UUID = "IndexerNodeUUID"
View Source
const INDEXER_STATE_KEY = "IndexerState"
View Source
const INIT_TOPIC = "INIT_STREAM_TOPIC"

Initial Stream Topic Name

View Source
const INST_MAP_KEY_NAME = "IndexInstMap"
View Source
const KV_RETRY_INTERVAL = 5000
View Source
const LOCALHOST = "127.0.0.1"
View Source
const MAINT_TOPIC = "MAINT_STREAM_TOPIC"

Maintenance Topic Name

View Source
const MAX_DOCID_LEN = 256
View Source
const (
	MAX_GETSEQS_RETRIES = 10
)
View Source
const MAX_KEY_EXTRABYTES_LEN = MAX_DOCID_LEN + 2
View Source
const MAX_KVWARMUP_RETRIES = 120
View Source
const MAX_METAKV_RETRIES = 100
View Source
const MAX_NUM_VBUCKETS = 1024

Max number of vbuckets supported in the system

View Source
const MAX_SNAPSHOTS_PER_INDEX = 5

Max number of snapshot to be retained per index. Older snapshots are deleted.

View Source
const MoveIndexTokenPath = RebalanceMetakvDir + MoveIndexTokenTag
View Source
const MoveIndexTokenTag = "MoveIndexToken"
View Source
const PLASMA_MEMQUOTA_FRAC = 0.9
View Source
const REPAIR_BATCH_TIMEOUT = 1000

timeout in milliseconds to batch the vbuckets together for repair message

View Source
const REPAIR_RETRY_BEFORE_SHUTDOWN = 5

const REPAIR_RETRY_INTERVAL = 5000

View Source
const RESIZE_PAD = 1024
View Source
const RebalanceMetakvDir = c.IndexingMetaDir + "rebalance/"
View Source
const RebalanceRunning = "RebalanceRunning"
View Source
const RebalanceTokenPath = RebalanceMetakvDir + RebalanceTokenTag
View Source
const RebalanceTokenTag = "RebalanceToken"
View Source
const SCAN_ROLLBACK_ERROR_BATCHSIZE = 1000
View Source
const SLICE_COMMAND_BUFFER_SIZE = 20000

Internal Buffer Size for Each Slice to store incoming requests

View Source
const TransferTokenTag = "TransferToken"
View Source
const UnboundedLiteral = "~[]{}UnboundedTruenilNA~"
View Source
const WORKER_MSG_QUEUE_LEN = 1000000

Supervisor's channel capacity to buffer requests from workers

View Source
const WORKER_RECV_QUEUE_LEN = 10000

Intermediate message buffer channel size

Variables

View Source
var (
	ErrArrayItemKeyTooLong = errors.New("Array item key too long")
	ErrArrayKeyTooLong     = errors.New("Array to be indexed too long")
)
View Source
var (
	ErrSecKeyNil     = errors.New("Secondary key array is empty")
	ErrSecKeyTooLong = errors.New(fmt.Sprintf("Secondary key is too long (> %d)", maxSecKeyLen))
	ErrDocIdTooLong  = errors.New(fmt.Sprintf("DocID is too long (>%d)", MAX_DOCID_LEN))
)
View Source
var (
	MinIndexKey = &NilIndexKey{cmp: -1, pcmp: -1}
	MaxIndexKey = &NilIndexKey{cmp: 1, pcmp: 1}
	NilJsonKey  = []byte("[]")
)

Special index keys

View Source
var (
	ErrFatalComm                = errors.New("Fatal Internal Communication Error")
	ErrInconsistentState        = errors.New("Inconsistent Internal State")
	ErrKVRollbackForInitRequest = errors.New("KV Rollback Received For Initial Build Request")
	ErrMaintStreamMissingBucket = errors.New("Bucket Missing in Maint Stream")
	ErrInvalidStream            = errors.New("Invalid Stream")
	ErrIndexerInRecovery        = errors.New("Indexer In Recovery")
	ErrKVConnect                = errors.New("Error Connecting KV")
	ErrUnknownBucket            = errors.New("Unknown Bucket")
	ErrIndexerNotActive         = errors.New("Indexer Not Active")
	ErrInvalidMetadata          = errors.New("Invalid Metadata")
	ErrBucketEphemeral          = errors.New("Ephemeral Buckets Must Use MOI Storage")
)

Errors

View Source
var (
	ErrNotMyIndex         = errors.New("Not my index")
	ErrInternal           = errors.New("Internal server error occured")
	ErrSnapNotAvailable   = errors.New("No snapshot available for scan")
	ErrUnsupportedRequest = errors.New("Unsupported query request")
	ErrVbuuidMismatch     = errors.New("Mismatch in session vbuuids")
)

Errors

View Source
var (
	ErrIndexRollback            = errors.New("Indexer rollback")
	ErrIndexRollbackOrBootstrap = errors.New("Indexer rollback or warmup")
)
View Source
var (
	ErrLimitReached = errors.New("Row limit reached")
)
View Source
var (
	ErrUnsupportedInclusion = errors.New("Unsupported range inclusion option")
)
View Source
var FORESTDB_INMEMSEQ = forestdb.SeqNum(math.MaxUint64)
View Source
var ServiceAddrMap map[string]string
View Source
var StreamTopicName map[common.StreamId]string

Functions

func ArrayIndexItems

func ArrayIndexItems(bs []byte, arrPos int, buf []byte,
	isDistinct, checkSize bool) ([][]byte, []int, int, error)

func BytesToPrimaryIndexEntry

func BytesToPrimaryIndexEntry(b []byte) (*primaryIndexEntry, error)

func BytesToSecondaryIndexEntry

func BytesToSecondaryIndexEntry(b []byte) (*secondaryIndexEntry, error)

func CompareArrayEntriesWithCount

func CompareArrayEntriesWithCount(newKey, oldKey [][]byte, newKeyCount, oldKeyCount []int) ([][]byte, [][]byte)

Compare two arrays of byte arrays and find out diff of which byte entry needs to be deleted and which needs to be inserted

func CreateMutationStreamReader

func CreateMutationStreamReader(streamId common.StreamId, bucketQueueMap BucketQueueMap,
	bucketFilter map[string]*common.TsVbuuid, supvCmdch MsgChannel, supvRespch MsgChannel,
	numWorkers int, stats *IndexerStats, config common.Config, is common.IndexerState) (MutationStreamReader, Message)

CreateMutationStreamReader creates a new mutation stream and starts a reader to listen and process the mutations. In case returned MutationStreamReader is nil, Message will have the error msg.

func DecodeRev

func DecodeRev(ext service.Revision) uint64

func DestroyIndexSnapshot

func DestroyIndexSnapshot(is IndexSnapshot) error

func EncodeRev

func EncodeRev(rev uint64) service.Revision

func FlipBits

func FlipBits(code []byte)

flip bits in-place for a given byte slice

func GetBucketUUID

func GetBucketUUID(cluster, bucket string) string

func GetIndexEntryBytes

func GetIndexEntryBytes(key []byte, docid []byte,
	isPrimary bool, isArray bool, count int, desc []bool) (entry []byte, err error)

func GetIndexEntryBytes2

func GetIndexEntryBytes2(key []byte, docid []byte,
	isPrimary bool, isArray bool, count int, desc []bool, buf []byte) (bs []byte, err error)

func GetIndexEntryBytes3

func GetIndexEntryBytes3(key []byte, docid []byte,
	isPrimary bool, isArray bool, count int, desc []bool, buf []byte) (bs []byte, err error)

Return encoded key with docid without size check

func GetLocalIP

func GetLocalIP() (net.IP, error)

func IndexEntrySize

func IndexEntrySize(key []byte, docid []byte) int

func IndexKeyLessThan

func IndexKeyLessThan(a, b IndexKey) bool

Return true if a < b

func IndexPath

func IndexPath(inst *common.IndexInst, sliceId SliceId) string

func IndexPointLessThan

func IndexPointLessThan(x, y IndexPoint) bool

Return true if x < y

func IsEphemeral

func IsEphemeral(cluster, bucket string) (bool, error)

func IsIPLocal

func IsIPLocal(ip string) bool

func MetakvDel

func MetakvDel(path string) error

func MetakvGet

func MetakvGet(path string, v interface{}) (bool, error)

func MetakvRecurciveDel

func MetakvRecurciveDel(dirpath string) error

func MetakvSet

func MetakvSet(path string, v interface{}) error

func NewAdminManager

func NewAdminManager(supvCmdch MsgChannel, supvRespch MsgChannel) (
	AdminManager, Message)

func NewAtomicMutationQueue

func NewAtomicMutationQueue(bucket string, numVbuckets uint16, maxMemory *int64,
	memUsed *int64, config common.Config) *atomicMutationQueue

NewAtomicMutationQueue allocates a new Atomic Mutation Queue and initializes it

func NewClustMgrAgent

func NewClustMgrAgent(supvCmdch MsgChannel, supvRespch MsgChannel, cfg common.Config) (
	ClustMgrAgent, Message)

func NewCompactionManager

func NewCompactionManager(supvCmdCh MsgChannel, supvMsgCh MsgChannel,
	config common.Config) (CompactionManager, Message)

func NewDDLServiceMgr

func NewDDLServiceMgr(supvCmdch MsgChannel, supvMsgch MsgChannel, config common.Config) (*DDLServiceMgr, Message)

Constructor

func NewFlusher

func NewFlusher(config common.Config, stats *IndexerStats) *flusher

NewFlusher returns new instance of flusher

func NewForestDBSlice

func NewForestDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
	idxInstId common.IndexInstId, isPrimary bool,
	sysconf common.Config, idxStats *IndexStats) (*fdbSlice, error)

NewForestDBSlice initiailizes a new slice with forestdb backend. Both main and back index gets initialized with default config. Slice methods are not thread-safe and application needs to handle the synchronization. The only exception being Insert and Delete can be called concurrently. Returns error in case slice cannot be initialized.

func NewIndexer

func NewIndexer(config common.Config) (Indexer, Message)

func NewKVSender

func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel,
	config c.Config) (KVSender, Message)

func NewMemDBSlice

func NewMemDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
	idxInstId common.IndexInstId, isPrimary bool, hasPersistance bool,
	sysconf common.Config, idxStats *IndexStats) (*memdbSlice, error)

func NewMetaNotifier

func NewMetaNotifier(adminCh MsgChannel, config common.Config, mgr *clustMgrAgent) *metaNotifier

func NewMutationManager

func NewMutationManager(supvCmdch MsgChannel, supvRespch MsgChannel,
	config common.Config) (MutationManager, Message)

NewMutationManager creates a new Mutation Manager which listens for commands from Indexer. In case returned MutationManager is nil, Message will have the error msg. supvCmdch is a synchronous channel and every request on this channel is followed by a response on the same channel. Supervisor is expected to wait for the response before issuing a new request on this channel. supvRespch will be used by Mutation Manager to send any async error/info messages that may happen due to any downstream error or its own processing. Additionally, for Flush commands, a sync response is sent on supvCmdch to indicate flush has been initiated and once flush completes, another message is sent on supvRespch to indicate its completion or any error that may have happened. If supvRespch or supvCmdch is closed, mutation manager will termiate its loop.

func NewPlasmaSlice

func NewPlasmaSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
	idxInstId common.IndexInstId, isPrimary bool,
	sysconf common.Config, idxStats *IndexStats) (*plasmaSlice, error)

func NewPrimaryIndexEntry

func NewPrimaryIndexEntry(docid []byte) (primaryIndexEntry, error)

func NewProtoWriter

func NewProtoWriter(t ScanReqType, conn net.Conn) *protoResponseWriter

func NewRebalanceMgr

func NewRebalanceMgr(supvCmdch MsgChannel, supvMsgch MsgChannel, config c.Config,
	rebalanceRunning bool, rebalanceToken *RebalanceToken) (RebalanceMgr, Message)

func NewScanCoordinator

func NewScanCoordinator(supvCmdch MsgChannel, supvMsgch MsgChannel,
	config common.Config, snapshotNotifych chan IndexSnapshot) (ScanCoordinator, Message)

NewScanCoordinator returns an instance of scanCoordinator or err message It listens on supvCmdch for command and every command is followed by a synchronous response on the supvCmdch. Any async message to supervisor is sent to supvMsgch. If supvCmdch get closed, ScanCoordinator will shut itself down.

func NewSecondaryIndexEntry

func NewSecondaryIndexEntry(key []byte, docid []byte, isArray bool, count int, desc []bool, buf []byte) (secondaryIndexEntry, error)

func NewSecondaryIndexEntry2

func NewSecondaryIndexEntry2(key []byte, docid []byte, isArray bool,
	count int, desc []bool, buf []byte, validateSize bool) (secondaryIndexEntry, error)

func NewSlabManager

func NewSlabManager(startChunkSize int, slabSize int, maxMemAlloc uint64) (SlabManager, Message)

NewSlabManager returns a slabManager struct instance with an initialized Arena

func NewSnapshotInfoContainer

func NewSnapshotInfoContainer(infos []SnapshotInfo) *snapshotInfoContainer

func NewStorageManager

func NewStorageManager(supvCmdch MsgChannel, supvRespch MsgChannel,
	indexPartnMap IndexPartnMap, config common.Config, snapshotNotifych chan IndexSnapshot) (
	StorageManager, Message)

NewStorageManager returns an instance of storageMgr or err message It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.

func NewTimekeeper

func NewTimekeeper(supvCmdch MsgChannel, supvRespch MsgChannel,
	config common.Config) (Timekeeper, Message)

NewTimekeeper returns an instance of timekeeper or err message. It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.

func ScanTStoString

func ScanTStoString(ts *common.TsVbuuid) string

Helper method to pretty print timestamp

func StartCpuCollector

func StartCpuCollector() error

Start Cpu collection

func ValidateBucket

func ValidateBucket(cluster, bucket string, uuids []string) bool

Types

type AdminManager

type AdminManager interface {
}

AdminManager listens to the admin port messages and relays it back to Indexer

type BucketAbortInProgressMap

type BucketAbortInProgressMap map[string]bool

type BucketDrainEnabledMap

type BucketDrainEnabledMap map[string]bool

type BucketFlushEnabledMap

type BucketFlushEnabledMap map[string]bool

type BucketFlushInProgressMap

type BucketFlushInProgressMap map[string]bool

type BucketFlushInProgressTsMap

type BucketFlushInProgressTsMap map[string]*common.TsVbuuid

type BucketHWTMap

type BucketHWTMap map[string]*common.TsVbuuid

type BucketHasBuildCompTSMap

type BucketHasBuildCompTSMap map[string]bool

type BucketIndexCountMap

type BucketIndexCountMap map[string]int

type BucketLastFlushedTsMap

type BucketLastFlushedTsMap map[string]*common.TsVbuuid

type BucketLastPersistTime

type BucketLastPersistTime map[string]time.Time

type BucketLastSnapMarker

type BucketLastSnapMarker map[string]*common.TsVbuuid

type BucketNeedsCommitMap

type BucketNeedsCommitMap map[string]bool

type BucketNewTsReqdMap

type BucketNewTsReqdMap map[string]bool

type BucketObserveFlushDoneMap

type BucketObserveFlushDoneMap map[string]MsgChannel

type BucketOpenTsMap

type BucketOpenTsMap map[string]*common.TsVbuuid

type BucketQueueMap

type BucketQueueMap map[string]IndexerMutationQueue

Map from bucket name to mutation queue

func CopyBucketQueueMap

func CopyBucketQueueMap(inMap BucketQueueMap) BucketQueueMap

type BucketRepairStopCh

type BucketRepairStopCh map[string]StopChannel

type BucketRequestStopCh

type BucketRequestStopCh map[string]StopChannel

type BucketRestartTsMap

type BucketRestartTsMap map[string]*common.TsVbuuid

type BucketRestartVbErrMap

type BucketRestartVbErrMap map[string]bool

type BucketRestartVbRetryMap

type BucketRestartVbRetryMap map[string]Timestamp

type BucketRestartVbTsMap

type BucketRestartVbTsMap map[string]*common.TsVbuuid

type BucketRollbackTs

type BucketRollbackTs map[string]*common.TsVbuuid

type BucketSkippedInMemTs

type BucketSkippedInMemTs map[string]uint64

type BucketStartTimeMap

type BucketStartTimeMap map[string]uint64

type BucketStats

type BucketStats struct {
	// contains filtered or unexported fields
}

func (*BucketStats) Init

func (s *BucketStats) Init()

type BucketStatus

type BucketStatus map[string]StreamStatus

type BucketStopChMap

type BucketStopChMap map[string]StopChannel

Map from bucket name to flusher stop channel

type BucketTimerStopCh

type BucketTimerStopCh map[string]StopChannel

type BucketTsListMap

type BucketTsListMap map[string]*list.List

type BucketVbRefCountMap

type BucketVbRefCountMap map[string]Timestamp

type BucketVbStatusMap

type BucketVbStatusMap map[string]Timestamp

type Callbacks

type Callbacks struct {
	// contains filtered or unexported fields
}

type CancelCb

type CancelCb struct {
	// contains filtered or unexported fields
}

func NewCancelCallback

func NewCancelCallback(req *ScanRequest, callb func(error)) *CancelCb

func (*CancelCb) Done

func (c *CancelCb) Done()

func (*CancelCb) Run

func (c *CancelCb) Run()

type Cleanup

type Cleanup struct {
	// contains filtered or unexported fields
}

func NewCleanup

func NewCleanup(f func()) *Cleanup

func (*Cleanup) Cancel

func (c *Cleanup) Cancel()

func (*Cleanup) Run

func (c *Cleanup) Run()

type ClustMgrAgent

type ClustMgrAgent interface {
}

ClustMgrAgent provides the mechanism to talk to Index Coordinator

type CmpEntry

type CmpEntry func(IndexKey, IndexEntry) int

type CompactionManager

type CompactionManager interface {
}

type CompositeElementFilter

type CompositeElementFilter struct {
	Low       IndexKey
	High      IndexKey
	Inclusion Inclusion
}

Range for a single field in composite index

type Counter

type Counter interface {
	CountTotal(ctx IndexReaderContext, stopch StopChannel) (uint64, error)
	// Approximate count
	StatCountTotal() (uint64, error)
}

Counter is a class of algorithms that return total node count efficiently

type DDLServiceMgr

type DDLServiceMgr struct {
	// contains filtered or unexported fields
}

DDLServiceMgr Definition

type DoneCallback

type DoneCallback func(err error, cancel <-chan struct{})

type DoneChannel

type DoneChannel chan bool

a generic channel which can be closed when you want to indicate the caller that you are done

type EntryCallback

type EntryCallback func([]byte) error

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (Error) String

func (e Error) String() string

type Exister

type Exister interface {
	Exists(ctx IndexReaderContext, Indexkey IndexKey, stopch StopChannel) (bool, error)
}

Exister is a class of algorithms that allow testing if a key exists in the index

type Filter

type Filter struct {
	// If composite index has n keys,
	// it will have <= n CompositeElementFilters
	CompositeFilters []CompositeElementFilter
	Low              IndexKey
	High             IndexKey
	Inclusion        Inclusion
	ScanType         ScanFilterType
}

type Flusher

type Flusher interface {

	//PersistUptoTS will flush the mutation queue upto Timestamp provided.
	//Can be stopped anytime by closing StopChannel.
	//Sends SUCCESS on the MsgChannel when its done flushing till TS.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel
	//to get notified about shutdown completion.
	PersistUptoTS(q MutationQueue, streamId common.StreamId, bucket string, indexInstMap common.IndexInstMap,
		indexPartnMap IndexPartnMap, ts Timestamp, changeVec []bool, stopch StopChannel) MsgChannel

	//DrainUptoTS will flush the mutation queue upto Timestamp
	//provided without actually persisting it.
	//Can be stopped anytime by closing the StopChannel.
	//Sends SUCCESS on the MsgChannel when its done flushing till timestamp.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel
	//to get notified about shutdown completion.
	DrainUptoTS(q MutationQueue, streamId common.StreamId, bucket string, ts Timestamp,
		changeVec []bool, stopch StopChannel) MsgChannel

	//Persist will keep flushing the mutation queue till caller closes
	//the stop channel.Can be stopped anytime by closing the StopChannel.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel to get
	//notified about shutdown completion.
	Persist(q MutationQueue, streamId common.StreamId, bucket string, indexInstMap common.IndexInstMap,
		indexPartnMap IndexPartnMap, stopch StopChannel) MsgChannel

	//Drain will keep flushing the mutation queue till caller closes
	//the stop channel without actually persisting the mutations.
	//Can be stopped anytime by closing the StopChannel.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel to get
	//notified about shutdown completion.
	Drain(q MutationQueue, streamId common.StreamId, bucket string, stopch StopChannel) MsgChannel

	//IsTimestampGreaterThanQueueLWT checks if each Vbucket in the Queue
	//has mutation with Seqno lower than the corresponding Seqno present
	//in the specified timestamp.
	IsQueueLWTLowerThanTimestamp(q MutationQueue, ts Timestamp) bool

	//GetQueueLWT returns the lowest seqno for each vbucket in the queue
	GetQueueLWT(q MutationQueue) Timestamp

	//GetQueueHWT returns the highest seqno for each vbucket in the queue
	GetQueueHWT(q MutationQueue) Timestamp
}

Flusher is the only component which does read/dequeue from a MutationQueue. As MutationQueue has a restriction of only single reader and writer per vbucket, flusher should not be invoked concurrently for a single MutationQueue.

type ForestDBIterator

type ForestDBIterator struct {
	// contains filtered or unexported fields
}

ForestDBIterator taken from https://github.com/couchbaselabs/bleve/blob/master/index/store/goforestdb/iterator.go

func (*ForestDBIterator) Close

func (f *ForestDBIterator) Close() error

func (*ForestDBIterator) Get

func (f *ForestDBIterator) Get()

func (*ForestDBIterator) Key

func (f *ForestDBIterator) Key() []byte

func (*ForestDBIterator) Next

func (f *ForestDBIterator) Next()

func (*ForestDBIterator) Seek

func (f *ForestDBIterator) Seek(key []byte)

func (*ForestDBIterator) SeekFirst

func (f *ForestDBIterator) SeekFirst()

func (*ForestDBIterator) Valid

func (f *ForestDBIterator) Valid() bool

func (*ForestDBIterator) Value

func (f *ForestDBIterator) Value() []byte

type HashedSliceContainer

type HashedSliceContainer struct {
	SliceMap  map[SliceId]Slice
	NumSlices int
}

hashedSliceContainer provides a hash based implementation for SliceContainer. Each IndexKey is hashed to determine which slice it belongs to.

func NewHashedSliceContainer

func NewHashedSliceContainer() *HashedSliceContainer

NewHashedSliceContainer initializes a new HashedSliceContainer and returns

func (*HashedSliceContainer) AddSlice

func (sc *HashedSliceContainer) AddSlice(id SliceId, s Slice)

AddSlice adds a slice to the container

func (*HashedSliceContainer) GetAllSlices

func (sc *HashedSliceContainer) GetAllSlices() []Slice

GetAllSlices returns all slices from the container

func (*HashedSliceContainer) GetSliceById

func (sc *HashedSliceContainer) GetSliceById(id SliceId) Slice

GetSliceById returns Slice for the given SliceId

func (*HashedSliceContainer) GetSliceByIndexKey

func (sc *HashedSliceContainer) GetSliceByIndexKey(key common.IndexKey) Slice

GetSliceByIndexKey returns Slice for the given IndexKey This is a convenience method which calls other interface methods to first determine the sliceId from IndexKey and then the slice from sliceId

func (*HashedSliceContainer) GetSliceIdByIndexKey

func (sc *HashedSliceContainer) GetSliceIdByIndexKey(key common.IndexKey) SliceId

GetSliceIdByIndexKey returns SliceId for the given IndexKey

func (*HashedSliceContainer) RemoveSlice

func (sc *HashedSliceContainer) RemoveSlice(id SliceId)

RemoveSlice removes a slice from the container

func (*HashedSliceContainer) UpdateSlice

func (sc *HashedSliceContainer) UpdateSlice(id SliceId, s Slice)

UpdateSlice updates an existing slice to the container

type Inclusion

type Inclusion int

Inclusion controls how the boundaries values of a range are treated

const (
	Neither Inclusion = iota
	Low
	High
	Both
)

type IndexEntry

type IndexEntry interface {
	ReadDocId([]byte) ([]byte, error)
	ReadSecKey([]byte) ([]byte, error)
	Count() int
	Bytes() []byte
	String() string
}

Generic index entry abstraction (primary or secondary) Represents a row in the index

type IndexKey

type IndexKey interface {
	Compare(IndexEntry) int
	ComparePrefixFields(IndexEntry) int
	CompareIndexKey(IndexKey) int
	ComparePrefixIndexKey(IndexKey) int
	Bytes() []byte
	String() string
}

Generic index key abstraction (primary or secondary) Represents a key supplied by the user for scan operation

func NewPrimaryKey

func NewPrimaryKey(docid []byte) (IndexKey, error)

func NewSecondaryKey

func NewSecondaryKey(key []byte, buf []byte) (IndexKey, error)

type IndexPartnMap

type IndexPartnMap map[common.IndexInstId]PartitionInstMap

IndexPartnMap maps a IndexInstId to PartitionInstMap

func CopyIndexPartnMap

func CopyIndexPartnMap(inMap IndexPartnMap) IndexPartnMap

func (IndexPartnMap) String

func (pm IndexPartnMap) String() string

type IndexPoint

type IndexPoint struct {
	Value    IndexKey
	FilterId int
	Type     string
}

A point in index and the corresponding filter the point belongs to either as high or low

type IndexPoints

type IndexPoints []IndexPoint

Implements sort Interface

func (IndexPoints) Len

func (ip IndexPoints) Len() int

func (IndexPoints) Less

func (ip IndexPoints) Less(i, j int) bool

func (IndexPoints) Swap

func (ip IndexPoints) Swap(i, j int)

type IndexQueueMap

type IndexQueueMap map[common.IndexInstId]IndexerMutationQueue

IndexQueueMap is a map between IndexId and IndexerMutationQueue

type IndexReader

type IndexReader interface {
	Counter
	Ranger
	RangeCounter
}

type IndexReaderContext

type IndexReaderContext interface {
	Init()
	Done()
	SetCursorKey(cur *[]byte)
	GetCursorKey() *[]byte
}

Abstract context implemented by storage subsystem

type IndexScanDecoder

type IndexScanDecoder struct {
	p.ItemReadWriter
	// contains filtered or unexported fields
}

func (*IndexScanDecoder) Routine

func (d *IndexScanDecoder) Routine() error

type IndexScanSource

type IndexScanSource struct {
	p.ItemWriter
	// contains filtered or unexported fields
}

func (*IndexScanSource) Routine

func (s *IndexScanSource) Routine() error

type IndexScanWriter

type IndexScanWriter struct {
	p.ItemReader
	// contains filtered or unexported fields
}

func (*IndexScanWriter) Routine

func (d *IndexScanWriter) Routine() error

type IndexSnapMap

type IndexSnapMap map[common.IndexInstId]IndexSnapshot

type IndexSnapshot

type IndexSnapshot interface {
	IndexInstId() common.IndexInstId
	Timestamp() *common.TsVbuuid
	IsEpoch() bool
	Partitions() map[common.PartitionId]PartitionSnapshot
}

IndexSnapshot is an immutable data structure that provides point-in-time snapshot of an index instance held by an indexer. A consumer receiving a snapshot object can use it for scanning index entries Once the consumer has finished using this object, DestroyIndexSnapshot() method should be called to deallocate resources held by this object. Otherwise, it is consumer's responsibility to deallocate resources. A copy of the snapshot object can be made using CloneIndexSnapshot() method. A snapshot object should not be shared across multiple go routines unless they are serialized. CloneIndexSnapshot() should be used to create a copy of the object if the snapshot needs to be concurrently shared to multiple go routines.

func CloneIndexSnapshot

func CloneIndexSnapshot(is IndexSnapshot) IndexSnapshot

type IndexStats

type IndexStats struct {
	Timings IndexTimingStats
	// contains filtered or unexported fields
}

func (*IndexStats) Init

func (s *IndexStats) Init()

type IndexStorageStats

type IndexStorageStats struct {
	InstId common.IndexInstId
	Name   string
	Bucket string
	Stats  StorageStatistics
}

Represents storage stats for an index instance

func (IndexStorageStats) GetFragmentation

func (s IndexStorageStats) GetFragmentation() float64

func (IndexStorageStats) GetInternalData

func (s IndexStorageStats) GetInternalData() []string

func (IndexStorageStats) String

func (s IndexStorageStats) String() string

type IndexTimingStats

type IndexTimingStats struct {
	// contains filtered or unexported fields
}

func (*IndexTimingStats) Init

func (it *IndexTimingStats) Init()

type IndexWriter

type IndexWriter interface {

	//Persist a key/value pair
	Insert(key []byte, docid []byte, meta *MutationMeta) error

	//Delete a key/value pair by docId
	Delete(docid []byte, meta *MutationMeta) error

	// Create commited commited snapshot or inmemory snapshot
	NewSnapshot(*common.TsVbuuid, bool) (SnapshotInfo, error)

	// Get the list of commited snapshots
	GetSnapshots() ([]SnapshotInfo, error)

	// Create open snapshot handle
	OpenSnapshot(SnapshotInfo) (Snapshot, error)

	//Rollback to given snapshot
	Rollback(s SnapshotInfo) error

	//Rollback to initial state
	RollbackToZero() error

	// Statistics used for compaction trigger
	Statistics() (StorageStatistics, error)

	// Perform file compaction
	Compact(abortTime time.Time) error

	// Dealloc resources
	Close()

	// Reference counting operators
	IncrRef()
	DecrRef()

	//Destroy/Wipe the index completely
	Destroy()
}

type Indexer

type Indexer interface {
	Shutdown() Message
}

type IndexerMutationQueue

type IndexerMutationQueue struct {
	// contains filtered or unexported fields
}

IndexMutationQueue comprising of a mutation queue and a slab manager

type IndexerStats

type IndexerStats struct {
	// contains filtered or unexported fields
}

func NewIndexerStats

func NewIndexerStats() *IndexerStats

func (*IndexerStats) AddIndex

func (s *IndexerStats) AddIndex(id common.IndexInstId, bucket string, name string, replicaId int)

func (IndexerStats) Clone

func (s IndexerStats) Clone() *IndexerStats

func (IndexerStats) GetStats

func (is IndexerStats) GetStats() common.Statistics

func (*IndexerStats) Init

func (s *IndexerStats) Init()

func (IndexerStats) MarshalJSON

func (is IndexerStats) MarshalJSON() ([]byte, error)

func (*IndexerStats) RemoveIndex

func (s *IndexerStats) RemoveIndex(id common.IndexInstId)

func (*IndexerStats) Reset

func (s *IndexerStats) Reset()

type IndexerStatsHolder

type IndexerStatsHolder struct {
	// contains filtered or unexported fields
}

func (IndexerStatsHolder) Get

func (*IndexerStatsHolder) Set

func (h *IndexerStatsHolder) Set(s *IndexerStats)

type InitialBuildInfo

type InitialBuildInfo struct {
	// contains filtered or unexported fields
}

type KVSender

type KVSender interface {
}

KVSender provides the mechanism to talk to KV(projector, router etc)

type Looker

Looker is a class of algorithms that allow looking up a key in an index. Usually, being able to look up a key means we can iterate through all keys too, and so that is introduced here as well.

type Message

type Message interface {
	GetMsgType() MsgType
}

func NewRestServer

func NewRestServer(cluster string) (*restServer, Message)

func NewSettingsManager

func NewSettingsManager(supvCmdch MsgChannel,
	supvMsgch MsgChannel, config common.Config) (settingsManager, common.Config, Message)

func NewStatsManager

func NewStatsManager(supvCmdch MsgChannel,
	supvMsgch MsgChannel, config common.Config) (*statsManager, Message)

type MetaUpdateFields

type MetaUpdateFields struct {
	// contains filtered or unexported fields
}

type MsgBucketHWT

type MsgBucketHWT struct {
	// contains filtered or unexported fields
}

TK_GET_BUCKET_HWT STREAM_READER_HWT

func (*MsgBucketHWT) GetBucket

func (m *MsgBucketHWT) GetBucket() string

func (*MsgBucketHWT) GetHWT

func (m *MsgBucketHWT) GetHWT() *common.TsVbuuid

func (*MsgBucketHWT) GetMsgType

func (m *MsgBucketHWT) GetMsgType() MsgType

func (*MsgBucketHWT) GetPrevSnap

func (m *MsgBucketHWT) GetPrevSnap() *common.TsVbuuid

func (*MsgBucketHWT) GetStreamId

func (m *MsgBucketHWT) GetStreamId() common.StreamId

func (*MsgBucketHWT) String

func (m *MsgBucketHWT) String() string

type MsgBuildIndex

type MsgBuildIndex struct {
	// contains filtered or unexported fields
}

CLUST_MGR_BUILD_INDEX_DDL

func (*MsgBuildIndex) GetBucketList

func (m *MsgBuildIndex) GetBucketList() []string

func (*MsgBuildIndex) GetIndexList

func (m *MsgBuildIndex) GetIndexList() []common.IndexInstId

func (*MsgBuildIndex) GetMsgType

func (m *MsgBuildIndex) GetMsgType() MsgType

func (*MsgBuildIndex) GetRequestCtx

func (m *MsgBuildIndex) GetRequestCtx() *common.MetadataRequestContext

func (*MsgBuildIndex) GetRespCh

func (m *MsgBuildIndex) GetRespCh() MsgChannel

func (*MsgBuildIndex) GetString

func (m *MsgBuildIndex) GetString() string

type MsgBuildIndexResponse

type MsgBuildIndexResponse struct {
	// contains filtered or unexported fields
}

CLUST_MGR_BUILD_INDEX_DDL_RESPONSE

func (*MsgBuildIndexResponse) GetErrorMap

func (m *MsgBuildIndexResponse) GetErrorMap() map[common.IndexInstId]error

func (*MsgBuildIndexResponse) GetMsgType

func (m *MsgBuildIndexResponse) GetMsgType() MsgType

func (*MsgBuildIndexResponse) GetString

func (m *MsgBuildIndexResponse) GetString() string

type MsgChannel

type MsgChannel chan Message

type MsgCheckDDLInProgress

type MsgCheckDDLInProgress struct {
	// contains filtered or unexported fields
}

func (*MsgCheckDDLInProgress) GetMsgType

func (m *MsgCheckDDLInProgress) GetMsgType() MsgType

func (*MsgCheckDDLInProgress) GetRespCh

func (m *MsgCheckDDLInProgress) GetRespCh() chan bool

type MsgClustMgrLocal

type MsgClustMgrLocal struct {
	// contains filtered or unexported fields
}

CLUST_MGR_GET_LOCAL CLUST_MGR_SET_LOCAL CLUST_MGR_DEL_LOCAL

func (*MsgClustMgrLocal) GetCheckDDL

func (m *MsgClustMgrLocal) GetCheckDDL() bool

func (*MsgClustMgrLocal) GetError

func (m *MsgClustMgrLocal) GetError() error

func (*MsgClustMgrLocal) GetKey

func (m *MsgClustMgrLocal) GetKey() string

func (*MsgClustMgrLocal) GetMsgType

func (m *MsgClustMgrLocal) GetMsgType() MsgType

func (*MsgClustMgrLocal) GetRespCh

func (m *MsgClustMgrLocal) GetRespCh() MsgChannel

func (*MsgClustMgrLocal) GetValue

func (m *MsgClustMgrLocal) GetValue() string

type MsgClustMgrResetIndex

type MsgClustMgrResetIndex struct {
	// contains filtered or unexported fields
}

CLUST_MGR_RESET_INDEX

func (*MsgClustMgrResetIndex) GetIndex

func (m *MsgClustMgrResetIndex) GetIndex() common.IndexDefn

func (*MsgClustMgrResetIndex) GetMsgType

func (m *MsgClustMgrResetIndex) GetMsgType() MsgType

type MsgClustMgrTopology

type MsgClustMgrTopology struct {
	// contains filtered or unexported fields
}

CLUST_MGR_GET_GLOBAL_TOPOLOGY

func (*MsgClustMgrTopology) GetInstMap

func (m *MsgClustMgrTopology) GetInstMap() common.IndexInstMap

func (*MsgClustMgrTopology) GetMsgType

func (m *MsgClustMgrTopology) GetMsgType() MsgType

type MsgClustMgrUpdate

type MsgClustMgrUpdate struct {
	// contains filtered or unexported fields
}

CLUST_MGR_UPDATE_TOPOLOGY_FOR_INDEX

func (*MsgClustMgrUpdate) GetBucket

func (m *MsgClustMgrUpdate) GetBucket() string

func (*MsgClustMgrUpdate) GetIndexList

func (m *MsgClustMgrUpdate) GetIndexList() []common.IndexInst

func (*MsgClustMgrUpdate) GetIsSyncUpdate

func (m *MsgClustMgrUpdate) GetIsSyncUpdate() bool

func (*MsgClustMgrUpdate) GetMsgType

func (m *MsgClustMgrUpdate) GetMsgType() MsgType

func (*MsgClustMgrUpdate) GetRespCh

func (m *MsgClustMgrUpdate) GetRespCh() chan error

func (*MsgClustMgrUpdate) GetStreamId

func (m *MsgClustMgrUpdate) GetStreamId() common.StreamId

func (*MsgClustMgrUpdate) GetUpdatedFields

func (m *MsgClustMgrUpdate) GetUpdatedFields() MetaUpdateFields

type MsgConfigUpdate

type MsgConfigUpdate struct {
	// contains filtered or unexported fields
}

func (*MsgConfigUpdate) GetConfig

func (m *MsgConfigUpdate) GetConfig() common.Config

func (*MsgConfigUpdate) GetMsgType

func (m *MsgConfigUpdate) GetMsgType() MsgType

type MsgCreateIndex

type MsgCreateIndex struct {
	// contains filtered or unexported fields
}

CBQ_CREATE_INDEX_DDL CLUST_MGR_CREATE_INDEX_DDL

func (*MsgCreateIndex) GetIndexInst

func (m *MsgCreateIndex) GetIndexInst() common.IndexInst

func (*MsgCreateIndex) GetMsgType

func (m *MsgCreateIndex) GetMsgType() MsgType

func (*MsgCreateIndex) GetRequestCtx

func (m *MsgCreateIndex) GetRequestCtx() *common.MetadataRequestContext

func (*MsgCreateIndex) GetResponseChannel

func (m *MsgCreateIndex) GetResponseChannel() MsgChannel

func (*MsgCreateIndex) GetString

func (m *MsgCreateIndex) GetString() string

type MsgDropIndex

type MsgDropIndex struct {
	// contains filtered or unexported fields
}

CBQ_DROP_INDEX_DDL CLUST_MGR_DROP_INDEX_DDL

func (*MsgDropIndex) GetBucket

func (m *MsgDropIndex) GetBucket() string

func (*MsgDropIndex) GetIndexInstId

func (m *MsgDropIndex) GetIndexInstId() common.IndexInstId

func (*MsgDropIndex) GetMsgType

func (m *MsgDropIndex) GetMsgType() MsgType

func (*MsgDropIndex) GetRequestCtx

func (m *MsgDropIndex) GetRequestCtx() *common.MetadataRequestContext

func (*MsgDropIndex) GetResponseChannel

func (m *MsgDropIndex) GetResponseChannel() MsgChannel

func (*MsgDropIndex) GetString

func (m *MsgDropIndex) GetString() string

type MsgError

type MsgError struct {
	// contains filtered or unexported fields
}

Error Message

func (*MsgError) GetError

func (m *MsgError) GetError() Error

func (*MsgError) GetMsgType

func (m *MsgError) GetMsgType() MsgType

func (*MsgError) String

func (m *MsgError) String() string

type MsgGeneral

type MsgGeneral struct {
	// contains filtered or unexported fields
}

Generic Message

func (*MsgGeneral) GetMsgType

func (m *MsgGeneral) GetMsgType() MsgType

type MsgIndexCompact

type MsgIndexCompact struct {
	// contains filtered or unexported fields
}

func (*MsgIndexCompact) GetAbortTime

func (m *MsgIndexCompact) GetAbortTime() time.Time

func (*MsgIndexCompact) GetErrorChannel

func (m *MsgIndexCompact) GetErrorChannel() chan error

func (*MsgIndexCompact) GetInstId

func (m *MsgIndexCompact) GetInstId() common.IndexInstId

func (*MsgIndexCompact) GetMsgType

func (m *MsgIndexCompact) GetMsgType() MsgType

type MsgIndexSnapRequest

type MsgIndexSnapRequest struct {
	// contains filtered or unexported fields
}

func (*MsgIndexSnapRequest) GetConsistency

func (m *MsgIndexSnapRequest) GetConsistency() common.Consistency

func (*MsgIndexSnapRequest) GetExpiredTime

func (m *MsgIndexSnapRequest) GetExpiredTime() time.Time

func (*MsgIndexSnapRequest) GetIndexId

func (m *MsgIndexSnapRequest) GetIndexId() common.IndexInstId

func (*MsgIndexSnapRequest) GetMsgType

func (m *MsgIndexSnapRequest) GetMsgType() MsgType

func (*MsgIndexSnapRequest) GetReplyChannel

func (m *MsgIndexSnapRequest) GetReplyChannel() chan interface{}

func (*MsgIndexSnapRequest) GetTS

func (m *MsgIndexSnapRequest) GetTS() *common.TsVbuuid

type MsgIndexStorageStats

type MsgIndexStorageStats struct {
	// contains filtered or unexported fields
}

func (*MsgIndexStorageStats) GetMsgType

func (m *MsgIndexStorageStats) GetMsgType() MsgType

func (*MsgIndexStorageStats) GetReplyChannel

func (m *MsgIndexStorageStats) GetReplyChannel() chan []IndexStorageStats

type MsgIndexerState

type MsgIndexerState struct {
	// contains filtered or unexported fields
}

INDEXER_PAUSE INDEXER_RESUME INDEXER_PREPARE_UNPAUSE INDEXER_UNPAUSE INDEXER_BOOTSTRAP

func (*MsgIndexerState) GetMsgType

func (m *MsgIndexerState) GetMsgType() MsgType

func (*MsgIndexerState) GetRollbackTimes

func (m *MsgIndexerState) GetRollbackTimes() map[string]int64

type MsgKVStreamRepair

type MsgKVStreamRepair struct {
	// contains filtered or unexported fields
}

KV_STREAM_REPAIR

func (*MsgKVStreamRepair) GetBucket

func (m *MsgKVStreamRepair) GetBucket() string

func (*MsgKVStreamRepair) GetMsgType

func (m *MsgKVStreamRepair) GetMsgType() MsgType

func (*MsgKVStreamRepair) GetRestartTs

func (m *MsgKVStreamRepair) GetRestartTs() *common.TsVbuuid

func (*MsgKVStreamRepair) GetStreamId

func (m *MsgKVStreamRepair) GetStreamId() common.StreamId

type MsgMutMgrFlushDone

type MsgMutMgrFlushDone struct {
	// contains filtered or unexported fields
}

MUT_MGR_FLUSH_DONE MUT_MGR_ABORT_DONE STORAGE_SNAP_DONE

func (*MsgMutMgrFlushDone) GetAborted

func (m *MsgMutMgrFlushDone) GetAborted() bool

func (*MsgMutMgrFlushDone) GetBucket

func (m *MsgMutMgrFlushDone) GetBucket() string

func (*MsgMutMgrFlushDone) GetMsgType

func (m *MsgMutMgrFlushDone) GetMsgType() MsgType

func (*MsgMutMgrFlushDone) GetStreamId

func (m *MsgMutMgrFlushDone) GetStreamId() common.StreamId

func (*MsgMutMgrFlushDone) GetTS

func (m *MsgMutMgrFlushDone) GetTS() *common.TsVbuuid

func (*MsgMutMgrFlushDone) String

func (m *MsgMutMgrFlushDone) String() string

type MsgMutMgrFlushMutationQueue

type MsgMutMgrFlushMutationQueue struct {
	// contains filtered or unexported fields
}

MUT_MGR_PERSIST_MUTATION_QUEUE MUT_MGR_ABORT_PERSIST MUT_MGR_DRAIN_MUTATION_QUEUE

func (*MsgMutMgrFlushMutationQueue) GetBucket

func (m *MsgMutMgrFlushMutationQueue) GetBucket() string

func (*MsgMutMgrFlushMutationQueue) GetChangeVector

func (m *MsgMutMgrFlushMutationQueue) GetChangeVector() []bool

func (*MsgMutMgrFlushMutationQueue) GetMsgType

func (m *MsgMutMgrFlushMutationQueue) GetMsgType() MsgType

func (*MsgMutMgrFlushMutationQueue) GetStreamId

func (m *MsgMutMgrFlushMutationQueue) GetStreamId() common.StreamId

func (*MsgMutMgrFlushMutationQueue) GetTimestamp

func (m *MsgMutMgrFlushMutationQueue) GetTimestamp() *common.TsVbuuid

func (*MsgMutMgrFlushMutationQueue) String

func (m *MsgMutMgrFlushMutationQueue) String() string

type MsgMutMgrGetTimestamp

type MsgMutMgrGetTimestamp struct {
	// contains filtered or unexported fields
}

MUT_MGR_GET_MUTATION_QUEUE_HWT MUT_MGR_GET_MUTATION_QUEUE_LWT

func (*MsgMutMgrGetTimestamp) GetBucket

func (m *MsgMutMgrGetTimestamp) GetBucket() string

func (*MsgMutMgrGetTimestamp) GetMsgType

func (m *MsgMutMgrGetTimestamp) GetMsgType() MsgType

func (*MsgMutMgrGetTimestamp) GetStreamId

func (m *MsgMutMgrGetTimestamp) GetStreamId() common.StreamId

type MsgRecovery

type MsgRecovery struct {
	// contains filtered or unexported fields
}

INDEXER_INIT_PREP_RECOVERY INDEXER_PREPARE_RECOVERY INDEXER_PREPARE_DONE INDEXER_INITIATE_RECOVERY INDEXER_RECOVERY_DONE INDEXER_BUCKET_NOT_FOUND

func (*MsgRecovery) GetActiveTs

func (m *MsgRecovery) GetActiveTs() *common.TsVbuuid

func (*MsgRecovery) GetBucket

func (m *MsgRecovery) GetBucket() string

func (*MsgRecovery) GetBuildTs

func (m *MsgRecovery) GetBuildTs() Timestamp

func (*MsgRecovery) GetMsgType

func (m *MsgRecovery) GetMsgType() MsgType

func (*MsgRecovery) GetRestartTs

func (m *MsgRecovery) GetRestartTs() *common.TsVbuuid

func (*MsgRecovery) GetStreamId

func (m *MsgRecovery) GetStreamId() common.StreamId

type MsgRepairAbort

type MsgRepairAbort struct {
	// contains filtered or unexported fields
}

func (*MsgRepairAbort) GetBucket

func (m *MsgRepairAbort) GetBucket() string

func (*MsgRepairAbort) GetMsgType

func (m *MsgRepairAbort) GetMsgType() MsgType

func (*MsgRepairAbort) GetStreamId

func (m *MsgRepairAbort) GetStreamId() common.StreamId

type MsgRepairEndpoints

type MsgRepairEndpoints struct {
	// contains filtered or unexported fields
}

KV_SENDER_REPAIR_ENDPOINTS

func (*MsgRepairEndpoints) GetEndpoints

func (m *MsgRepairEndpoints) GetEndpoints() []string

func (*MsgRepairEndpoints) GetMsgType

func (m *MsgRepairEndpoints) GetMsgType() MsgType

func (*MsgRepairEndpoints) GetStreamId

func (m *MsgRepairEndpoints) GetStreamId() common.StreamId

func (*MsgRepairEndpoints) String

func (m *MsgRepairEndpoints) String() string

type MsgResetStats

type MsgResetStats struct {
}

func (*MsgResetStats) GetMsgType

func (m *MsgResetStats) GetMsgType() MsgType

type MsgRestartVbuckets

type MsgRestartVbuckets struct {
	// contains filtered or unexported fields
}

KV_SENDER_RESTART_VBUCKETS

func (*MsgRestartVbuckets) ConnErrVbs

func (m *MsgRestartVbuckets) ConnErrVbs() []Vbucket

func (*MsgRestartVbuckets) GetBucket

func (m *MsgRestartVbuckets) GetBucket() string

func (*MsgRestartVbuckets) GetMsgType

func (m *MsgRestartVbuckets) GetMsgType() MsgType

func (*MsgRestartVbuckets) GetResponseCh

func (m *MsgRestartVbuckets) GetResponseCh() MsgChannel

func (*MsgRestartVbuckets) GetRestartTs

func (m *MsgRestartVbuckets) GetRestartTs() *common.TsVbuuid

func (*MsgRestartVbuckets) GetStopChannel

func (m *MsgRestartVbuckets) GetStopChannel() StopChannel

func (*MsgRestartVbuckets) GetStreamId

func (m *MsgRestartVbuckets) GetStreamId() common.StreamId

func (*MsgRestartVbuckets) String

func (m *MsgRestartVbuckets) String() string

type MsgRollback

type MsgRollback struct {
	// contains filtered or unexported fields
}

func (*MsgRollback) GetBucket

func (m *MsgRollback) GetBucket() string

func (*MsgRollback) GetMsgType

func (m *MsgRollback) GetMsgType() MsgType

func (*MsgRollback) GetRollbackTime

func (m *MsgRollback) GetRollbackTime() int64

func (*MsgRollback) GetRollbackTs

func (m *MsgRollback) GetRollbackTs() *common.TsVbuuid

func (*MsgRollback) GetStreamId

func (m *MsgRollback) GetStreamId() common.StreamId

type MsgStatsRequest

type MsgStatsRequest struct {
	// contains filtered or unexported fields
}

func (*MsgStatsRequest) FetchDcp

func (m *MsgStatsRequest) FetchDcp() bool

func (*MsgStatsRequest) GetMsgType

func (m *MsgStatsRequest) GetMsgType() MsgType

func (*MsgStatsRequest) GetReplyChannel

func (m *MsgStatsRequest) GetReplyChannel() chan bool

type MsgStream

type MsgStream struct {
	// contains filtered or unexported fields
}

Stream Reader Message

func (*MsgStream) GetMsgType

func (m *MsgStream) GetMsgType() MsgType

func (*MsgStream) GetMutationMeta

func (m *MsgStream) GetMutationMeta() *MutationMeta

func (*MsgStream) GetSnapshot

func (m *MsgStream) GetSnapshot() *MutationSnapshot

func (*MsgStream) GetStreamId

func (m *MsgStream) GetStreamId() common.StreamId

func (*MsgStream) String

func (m *MsgStream) String() string

type MsgStreamError

type MsgStreamError struct {
	// contains filtered or unexported fields
}

Stream Error Message

func (*MsgStreamError) GetError

func (m *MsgStreamError) GetError() Error

func (*MsgStreamError) GetMsgType

func (m *MsgStreamError) GetMsgType() MsgType

func (*MsgStreamError) GetStreamId

func (m *MsgStreamError) GetStreamId() common.StreamId

type MsgStreamInfo

type MsgStreamInfo struct {
	// contains filtered or unexported fields
}

STREAM_READER_CONN_ERROR STREAM_REQUEST_DONE

func (*MsgStreamInfo) GetActiveTs

func (m *MsgStreamInfo) GetActiveTs() *common.TsVbuuid

func (*MsgStreamInfo) GetBucket

func (m *MsgStreamInfo) GetBucket() string

func (*MsgStreamInfo) GetBuildTs

func (m *MsgStreamInfo) GetBuildTs() Timestamp

func (*MsgStreamInfo) GetMsgType

func (m *MsgStreamInfo) GetMsgType() MsgType

func (*MsgStreamInfo) GetStreamId

func (m *MsgStreamInfo) GetStreamId() common.StreamId

func (*MsgStreamInfo) GetVbList

func (m *MsgStreamInfo) GetVbList() []Vbucket

func (*MsgStreamInfo) String

func (m *MsgStreamInfo) String() string

type MsgStreamUpdate

type MsgStreamUpdate struct {
	// contains filtered or unexported fields
}

OPEN_STREAM ADD_INDEX_LIST_TO_STREAM REMOVE_BUCKET_FROM_STREAM REMOVE_INDEX_LIST_FROM_STREAM CLOSE_STREAM CLEANUP_STREAM

func (*MsgStreamUpdate) GetBucket

func (m *MsgStreamUpdate) GetBucket() string

func (*MsgStreamUpdate) GetIndexList

func (m *MsgStreamUpdate) GetIndexList() []common.IndexInst

func (*MsgStreamUpdate) GetMsgType

func (m *MsgStreamUpdate) GetMsgType() MsgType

func (*MsgStreamUpdate) GetResponseChannel

func (m *MsgStreamUpdate) GetResponseChannel() MsgChannel

func (*MsgStreamUpdate) GetRestartTs

func (m *MsgStreamUpdate) GetRestartTs() *common.TsVbuuid

func (*MsgStreamUpdate) GetRollbackTime

func (m *MsgStreamUpdate) GetRollbackTime() int64

func (*MsgStreamUpdate) GetStopChannel

func (m *MsgStreamUpdate) GetStopChannel() StopChannel

func (*MsgStreamUpdate) GetStreamId

func (m *MsgStreamUpdate) GetStreamId() common.StreamId

func (*MsgStreamUpdate) GetTimestamp

func (m *MsgStreamUpdate) GetTimestamp() Timestamp

func (*MsgStreamUpdate) String

func (m *MsgStreamUpdate) String() string

type MsgSuccess

type MsgSuccess struct {
}

Success Message

func (*MsgSuccess) GetMsgType

func (m *MsgSuccess) GetMsgType() MsgType

type MsgSuccessOpenStream

type MsgSuccessOpenStream struct {
	// contains filtered or unexported fields
}

Success Message

func (*MsgSuccessOpenStream) GetActiveTs

func (m *MsgSuccessOpenStream) GetActiveTs() *common.TsVbuuid

func (*MsgSuccessOpenStream) GetMsgType

func (m *MsgSuccessOpenStream) GetMsgType() MsgType

type MsgTKInitBuildDone

type MsgTKInitBuildDone struct {
	// contains filtered or unexported fields
}

TK_INIT_BUILD_DONE TK_INIT_BUILD_DONE_ACK

func (*MsgTKInitBuildDone) GetBucket

func (m *MsgTKInitBuildDone) GetBucket() string

func (*MsgTKInitBuildDone) GetMergeTs

func (m *MsgTKInitBuildDone) GetMergeTs() *common.TsVbuuid

func (*MsgTKInitBuildDone) GetMsgType

func (m *MsgTKInitBuildDone) GetMsgType() MsgType

func (*MsgTKInitBuildDone) GetStreamId

func (m *MsgTKInitBuildDone) GetStreamId() common.StreamId

func (*MsgTKInitBuildDone) GetTimestamp

func (m *MsgTKInitBuildDone) GetTimestamp() Timestamp

type MsgTKMergeStream

type MsgTKMergeStream struct {
	// contains filtered or unexported fields
}

TK_MERGE_STREAM TK_MERGE_STREAM_ACK

func (*MsgTKMergeStream) GetBucket

func (m *MsgTKMergeStream) GetBucket() string

func (*MsgTKMergeStream) GetMergeList

func (m *MsgTKMergeStream) GetMergeList() []common.IndexInst

func (*MsgTKMergeStream) GetMergeTS

func (m *MsgTKMergeStream) GetMergeTS() Timestamp

func (*MsgTKMergeStream) GetMsgType

func (m *MsgTKMergeStream) GetMsgType() MsgType

func (*MsgTKMergeStream) GetStreamId

func (m *MsgTKMergeStream) GetStreamId() common.StreamId

type MsgTKStabilityTS

type MsgTKStabilityTS struct {
	// contains filtered or unexported fields
}

TK_STABILITY_TIMESTAMP

func (*MsgTKStabilityTS) GetBucket

func (m *MsgTKStabilityTS) GetBucket() string

func (*MsgTKStabilityTS) GetChangeVector

func (m *MsgTKStabilityTS) GetChangeVector() []bool

func (*MsgTKStabilityTS) GetMsgType

func (m *MsgTKStabilityTS) GetMsgType() MsgType

func (*MsgTKStabilityTS) GetStreamId

func (m *MsgTKStabilityTS) GetStreamId() common.StreamId

func (*MsgTKStabilityTS) GetTimestamp

func (m *MsgTKStabilityTS) GetTimestamp() *common.TsVbuuid

func (*MsgTKStabilityTS) String

func (m *MsgTKStabilityTS) String() string

type MsgTKToggleFlush

type MsgTKToggleFlush struct {
	// contains filtered or unexported fields
}

TK_ENABLE_FLUSH TK_DISABLE_FLUSH

func (*MsgTKToggleFlush) GetBucket

func (m *MsgTKToggleFlush) GetBucket() string

func (*MsgTKToggleFlush) GetMsgType

func (m *MsgTKToggleFlush) GetMsgType() MsgType

func (*MsgTKToggleFlush) GetStreamId

func (m *MsgTKToggleFlush) GetStreamId() common.StreamId

type MsgTimestamp

type MsgTimestamp struct {
	// contains filtered or unexported fields
}

Timestamp Message

func (*MsgTimestamp) GetMsgType

func (m *MsgTimestamp) GetMsgType() MsgType

func (*MsgTimestamp) GetTimestamp

func (m *MsgTimestamp) GetTimestamp() Timestamp

type MsgType

type MsgType int16

func (MsgType) String

func (m MsgType) String() string

type MsgUpdateBucketQueue

type MsgUpdateBucketQueue struct {
	// contains filtered or unexported fields
}

STREAM_READER_UPDATE_QUEUE_MAP

func (*MsgUpdateBucketQueue) GetBucketFilter

func (m *MsgUpdateBucketQueue) GetBucketFilter() map[string]*common.TsVbuuid

func (*MsgUpdateBucketQueue) GetBucketQueueMap

func (m *MsgUpdateBucketQueue) GetBucketQueueMap() BucketQueueMap

func (*MsgUpdateBucketQueue) GetMsgType

func (m *MsgUpdateBucketQueue) GetMsgType() MsgType

func (*MsgUpdateBucketQueue) GetStatsObject

func (m *MsgUpdateBucketQueue) GetStatsObject() *IndexerStats

func (*MsgUpdateBucketQueue) String

func (m *MsgUpdateBucketQueue) String() string

type MsgUpdateIndexRState

type MsgUpdateIndexRState struct {
	// contains filtered or unexported fields
}

func (*MsgUpdateIndexRState) GetDefnId

func (m *MsgUpdateIndexRState) GetDefnId() common.IndexDefnId

func (*MsgUpdateIndexRState) GetMsgType

func (m *MsgUpdateIndexRState) GetMsgType() MsgType

func (*MsgUpdateIndexRState) GetRState

func (*MsgUpdateIndexRState) GetRespCh

func (m *MsgUpdateIndexRState) GetRespCh() chan error

type MsgUpdateInstMap

type MsgUpdateInstMap struct {
	// contains filtered or unexported fields
}

UPDATE_INSTANCE_MAP

func (*MsgUpdateInstMap) GetIndexInstMap

func (m *MsgUpdateInstMap) GetIndexInstMap() common.IndexInstMap

func (*MsgUpdateInstMap) GetMsgType

func (m *MsgUpdateInstMap) GetMsgType() MsgType

func (*MsgUpdateInstMap) GetRollbackTimes

func (m *MsgUpdateInstMap) GetRollbackTimes() map[string]int64

func (*MsgUpdateInstMap) GetStatsObject

func (m *MsgUpdateInstMap) GetStatsObject() *IndexerStats

func (*MsgUpdateInstMap) String

func (m *MsgUpdateInstMap) String() string

type MsgUpdatePartnMap

type MsgUpdatePartnMap struct {
	// contains filtered or unexported fields
}

UPDATE_PARTITION_MAP

func (*MsgUpdatePartnMap) GetIndexPartnMap

func (m *MsgUpdatePartnMap) GetIndexPartnMap() IndexPartnMap

func (*MsgUpdatePartnMap) GetMsgType

func (m *MsgUpdatePartnMap) GetMsgType() MsgType

func (*MsgUpdatePartnMap) String

func (m *MsgUpdatePartnMap) String() string

type Mutation

type Mutation struct {
	// contains filtered or unexported fields
}

func NewMutation

func NewMutation() *Mutation

func (*Mutation) Free

func (m *Mutation) Free()

func (*Mutation) Size

func (m *Mutation) Size() int64

type MutationChannel

type MutationChannel chan *MutationKeys

type MutationKeys

type MutationKeys struct {
	// contains filtered or unexported fields
}

MutationKeys holds the Secondary Keys from a single KV Mutation

func NewMutationKeys

func NewMutationKeys() *MutationKeys

func (*MutationKeys) Free

func (mk *MutationKeys) Free()

func (*MutationKeys) Size

func (mk *MutationKeys) Size() int64

type MutationManager

type MutationManager interface {
}

MutationManager handles messages from Indexer to manage Mutation Streams and flush mutations from mutation queues.

type MutationMeta

type MutationMeta struct {
	// contains filtered or unexported fields
}

MutationMeta represents meta information for a KV Mutation

func NewMutationMeta

func NewMutationMeta() *MutationMeta

func (*MutationMeta) Clone

func (m *MutationMeta) Clone() *MutationMeta

func (*MutationMeta) Free

func (m *MutationMeta) Free()

func (*MutationMeta) SetVBId

func (m *MutationMeta) SetVBId(vbid int)

func (*MutationMeta) Size

func (m *MutationMeta) Size() int64

func (MutationMeta) String

func (m MutationMeta) String() string

type MutationQueue

type MutationQueue interface {

	//enqueue a mutation reference based on vbucket. This is a blocking call which
	//will wait in case there is no free slot available for allocation.
	//caller can close the appch to force this call to return.
	Enqueue(mutation *MutationKeys, vbucket Vbucket, appch StopChannel) error

	//dequeue a vbucket's mutation and keep sending on a channel until stop signal
	Dequeue(vbucket Vbucket) (<-chan *MutationKeys, chan<- bool, error)
	//dequeue a vbucket's mutation upto seqno(wait if not available)
	DequeueUptoSeqno(vbucket Vbucket, seqno Seqno) (<-chan *MutationKeys, chan bool, error)
	//dequeue single element for a vbucket and return
	DequeueSingleElement(vbucket Vbucket) *MutationKeys

	//return reference to a vbucket's mutation at Tail of queue without dequeue
	PeekTail(vbucket Vbucket) *MutationKeys
	//return reference to a vbucket's mutation at Head of queue without dequeue
	PeekHead(vbucket Vbucket) *MutationKeys

	//return size of queue per vbucket
	GetSize(vbucket Vbucket) int64

	//returns the numbers of vbuckets for the queue
	GetNumVbuckets() uint16

	//destroy the resources
	Destroy()
}

MutationQueue interface specifies methods which a mutation queue for indexer needs to implement

type MutationSnapshot

type MutationSnapshot struct {
	// contains filtered or unexported fields
}

MutationSnapshot represents snapshot information of KV

func (MutationSnapshot) CanProcess

func (m MutationSnapshot) CanProcess() bool

func (MutationSnapshot) String

func (m MutationSnapshot) String() string

type MutationStreamReader

type MutationStreamReader interface {
	Shutdown()
}

MutationStreamReader reads a Dataport and stores the incoming mutations in mutation queue. This is the only component writing to a mutation queue.

type NilIndexKey

type NilIndexKey struct {
	// contains filtered or unexported fields
}

func (*NilIndexKey) Bytes

func (k *NilIndexKey) Bytes() []byte

func (*NilIndexKey) Compare

func (k *NilIndexKey) Compare(entry IndexEntry) int

func (*NilIndexKey) CompareIndexKey

func (k *NilIndexKey) CompareIndexKey(k1 IndexKey) int

func (*NilIndexKey) ComparePrefixFields

func (k *NilIndexKey) ComparePrefixFields(entry IndexEntry) int

func (*NilIndexKey) ComparePrefixIndexKey

func (k *NilIndexKey) ComparePrefixIndexKey(k1 IndexKey) int

func (*NilIndexKey) String

func (k *NilIndexKey) String() string

type PartitionInst

type PartitionInst struct {
	Defn common.PartitionDefn
	Sc   SliceContainer
}

PartitionInst contains the partition definition and a SliceContainer to manage all the slices storing the partition's data

func (PartitionInst) String

func (pi PartitionInst) String() string

type PartitionInstMap

type PartitionInstMap map[common.PartitionId]PartitionInst

PartitionInstMap maps a PartitionId to PartitionInst

type PartitionSnapshot

type PartitionSnapshot interface {
	PartitionId() common.PartitionId
	Slices() map[SliceId]SliceSnapshot
}

type ProgressCallback

type ProgressCallback func(progress float64, cancel <-chan struct{})

type Projection

type Projection struct {
	// contains filtered or unexported fields
}

type RangeCounter

type RangeCounter interface {
	CountRange(ctx IndexReaderContext, low, high IndexKey, inclusion Inclusion, stopch StopChannel) (
		uint64, error)
	CountLookup(ctx IndexReaderContext, keys []IndexKey, stopch StopChannel) (uint64, error)
	MultiScanCount(ctx IndexReaderContext, low, high IndexKey, inclusion Inclusion,
		scan Scan, distinct bool, stopch StopChannel) (
		uint64, error)
}

RangeCounter is a class of algorithms that can count a range efficiently

type Ranger

type Ranger interface {
	Looker
	Range(IndexReaderContext, IndexKey, IndexKey, Inclusion, EntryCallback) error
}

Ranger is a class of algorithms that can extract a range of keys from the index.

type RebalSource

type RebalSource byte
const (
	RebalSourceClusterOp RebalSource = iota
	RebalSourceMoveIndex
)

func (RebalSource) String

func (rs RebalSource) String() string

type RebalTokens

type RebalTokens struct {
	RT *RebalanceToken             `json:"rebalancetoken,omitempty"`
	MT *RebalanceToken             `json:"moveindextoken,omitempty"`
	TT map[string]*c.TransferToken `json:"transfertokens,omitempty"`
}

type RebalanceMgr

type RebalanceMgr interface {
}

RebalanceMgr manages the integration with ns-server and execution of all cluster wide operations like rebalance/failover

type RebalanceToken

type RebalanceToken struct {
	MasterId string
	RebalId  string
	Source   RebalSource
	Error    string
}

type Rebalancer

type Rebalancer struct {
	// contains filtered or unexported fields
}

func NewRebalancer

func NewRebalancer(transferTokens map[string]*c.TransferToken, rebalToken *RebalanceToken,
	nodeId string, master bool, progress ProgressCallback, done DoneCallback,
	supvMsgch MsgChannel, localaddr string, config c.Config) *Rebalancer

func (*Rebalancer) Cancel

func (r *Rebalancer) Cancel()

type Scan

type Scan struct {
	Low      IndexKey  // Overall Low for a Span. Computed from composite filters (Ranges)
	High     IndexKey  // Overall High for a Span. Computed from composite filters (Ranges)
	Incl     Inclusion // Overall Inclusion for a Span
	ScanType ScanFilterType
	Filters  []Filter // A collection qualifying filters
	Equals   IndexKey // TODO: Remove Equals
}

type ScanCoordinator

type ScanCoordinator interface {
}

type ScanFilterType

type ScanFilterType string

type ScanPipeline

type ScanPipeline struct {
	// contains filtered or unexported fields
}

func (ScanPipeline) BytesRead

func (p ScanPipeline) BytesRead() uint64

func (*ScanPipeline) Cancel

func (p *ScanPipeline) Cancel(err error)

func (*ScanPipeline) Execute

func (p *ScanPipeline) Execute() error

func (ScanPipeline) RowsReturned

func (p ScanPipeline) RowsReturned() uint64

type ScanReqType

type ScanReqType string

type ScanRequest

type ScanRequest struct {
	ScanType    ScanReqType
	DefnID      uint64
	IndexInstId common.IndexInstId
	IndexName   string
	Bucket      string
	Ts          *common.TsVbuuid
	Low         IndexKey
	High        IndexKey
	Keys        []IndexKey
	Consistency *common.Consistency
	Stats       *IndexStats
	IndexInst   common.IndexInst

	Ctx IndexReaderContext

	// user supplied
	LowBytes, HighBytes []byte
	KeysBytes           [][]byte

	Incl  Inclusion
	Limit int64

	// New parameters for spock
	Scans           []Scan
	Indexprojection *Projection
	Reverse         bool
	Distinct        bool
	Offset          int64

	ScanId      uint64
	ExpiredTime time.Time
	Timeout     *time.Timer
	CancelCh    <-chan bool

	RequestId string
	LogPrefix string
	// contains filtered or unexported fields
}

func (*ScanRequest) Done

func (r *ScanRequest) Done()

func (ScanRequest) String

func (r ScanRequest) String() string

type ScanResponseWriter

type ScanResponseWriter interface {
	Error(err error) error
	Stats(rows, unique uint64, min, max []byte) error
	Count(count uint64) error
	RawBytes([]byte) error
	Row(pk, sk []byte) error
	Done() error
	Helo() error
}

type Seqno

type Seqno uint64

type ServiceMgr

type ServiceMgr struct {
	// contains filtered or unexported fields
}

func (*ServiceMgr) CancelTask

func (m *ServiceMgr) CancelTask(id string, rev service.Revision) error

func (*ServiceMgr) GetCurrentTopology

func (m *ServiceMgr) GetCurrentTopology(rev service.Revision,
	cancel service.Cancel) (*service.Topology, error)

func (*ServiceMgr) GetNodeInfo

func (m *ServiceMgr) GetNodeInfo() (*service.NodeInfo, error)

func (*ServiceMgr) GetTaskList

func (m *ServiceMgr) GetTaskList(rev service.Revision,
	cancel service.Cancel) (*service.TaskList, error)

func (*ServiceMgr) PrepareTopologyChange

func (m *ServiceMgr) PrepareTopologyChange(change service.TopologyChange) error

All errors need to be reported as return value. Status of prepared task is not considered for failure reporting.

func (*ServiceMgr) Shutdown

func (m *ServiceMgr) Shutdown() error

func (*ServiceMgr) StartTopologyChange

func (m *ServiceMgr) StartTopologyChange(change service.TopologyChange) error

type SimplePlanner

type SimplePlanner struct {
	// contains filtered or unexported fields
}

func NewSimplePlanner

func NewSimplePlanner(topology *manager.ClusterIndexMetadata,
	change service.TopologyChange, nodeId string) *SimplePlanner

func (*SimplePlanner) PlanIndexMoves

func (p *SimplePlanner) PlanIndexMoves() map[string]*c.TransferToken

type SlabManager

type SlabManager interface {
	//AllocBuf allocates a buffer of given size. If returned buffer is nil,
	//Message will have error message
	AllocBuf(bufSize int) ([]byte, Message)

	//ReleaseBuf releases the buffer back to free pool
	ReleaseBuf(buf []byte) bool

	//SetMaxMemoryLimit sets the maximum memory that can be allocated
	SetMaxMemoryLimit(maxMemAlloc uint64) bool

	//GetMaxMemoryLimit returns the maximum memory that can be allocated
	GetMaxMemoryLimit() uint64
}

type Slice

type Slice interface {
	Id() SliceId
	Path() string
	Status() SliceStatus
	IndexInstId() common.IndexInstId
	IndexDefnId() common.IndexDefnId
	IsActive() bool
	IsDirty() bool

	SetActive(bool)
	SetStatus(SliceStatus)

	UpdateConfig(common.Config)

	IndexWriter
	GetReaderContext() IndexReaderContext
}

Slice represents the unit of physical storage for index

func NewSlice

func NewSlice(id SliceId, indInst *common.IndexInst,
	conf common.Config, stats *IndexerStats) (slice Slice, err error)

type SliceContainer

type SliceContainer interface {
	//Add Slice to container
	AddSlice(SliceId, Slice)

	//Update existing slice
	UpdateSlice(SliceId, Slice)

	//Remove existing slice
	RemoveSlice(SliceId)

	//Return Slice for the given IndexKey
	GetSliceByIndexKey(common.IndexKey) Slice

	//Return SliceId for the given IndexKey
	GetSliceIdByIndexKey(common.IndexKey) SliceId

	//Return Slice for the given SliceId
	GetSliceById(SliceId) Slice

	//Return all Slices
	GetAllSlices() []Slice
}

SliceContainer contains all slices for an index partition and provides methods to determine how data is distributed in multiple slices for a single partition

type SliceId

type SliceId uint64

type SliceSnapshot

type SliceSnapshot interface {
	SliceId() SliceId
	Snapshot() Snapshot
}

func GetSliceSnapshots

func GetSliceSnapshots(is IndexSnapshot) (s []SliceSnapshot)

type SliceStatus

type SliceStatus int16
const (
	//Slice is warming up(open db files etc), not ready for operations
	SLICE_STATUS_PREPARING SliceStatus = iota
	//Ready for operations
	SLICE_STATUS_ACTIVE
	//Marked for deletion
	SLICE_STATUS_TERMINATE
)

type Snapshot

type Snapshot interface {
	IndexReader

	Open() error
	Close() error
	IsOpen() bool

	Id() SliceId
	IndexInstId() common.IndexInstId
	IndexDefnId() common.IndexDefnId

	Timestamp() *common.TsVbuuid

	Info() SnapshotInfo
}

Snapshot interface

type SnapshotInfo

type SnapshotInfo interface {
	Timestamp() *common.TsVbuuid
	IsCommitted() bool
}

type SnapshotInfoContainer

type SnapshotInfoContainer interface {
	List() []SnapshotInfo

	Add(SnapshotInfo)
	Len() int

	GetLatest() SnapshotInfo
	GetOldest() SnapshotInfo
	GetEqualToTS(*common.TsVbuuid) SnapshotInfo
	GetOlderThanTS(*common.TsVbuuid) SnapshotInfo

	RemoveOldest() error
	RemoveRecentThanTS(*common.TsVbuuid) error
	RemoveAll() error
}

A helper data stucture for in-memory snapshot info list

type SortOrder

type SortOrder string

SortOrder characterizes if the algorithm emits keys in a predictable order

type StabilityTimestamp

type StabilityTimestamp Timestamp

Stability Timestamp

type StopChannel

type StopChannel chan bool

a generic channel which can be closed when you want someone to stop doing something

type StorageManager

type StorageManager interface {
}

type StorageStatistics

type StorageStatistics struct {
	DataSize          int64
	DiskSize          int64
	ExtraSnapDataSize int64

	GetBytes    int64
	InsertBytes int64
	DeleteBytes int64

	NeedUpgrade bool

	InternalData []string
}

type StreamAddressMap

type StreamAddressMap map[common.StreamId]common.Endpoint
var StreamAddrMap StreamAddressMap

type StreamState

type StreamState struct {
	// contains filtered or unexported fields
}

func InitStreamState

func InitStreamState(config common.Config) *StreamState

func (*StreamState) UpdateConfig

func (ss *StreamState) UpdateConfig(cfg common.Config)

type StreamStatus

type StreamStatus byte
const (
	//Stream is inactive i.e. not processing mutations
	STREAM_INACTIVE StreamStatus = iota
	//Stream is active i.e. processing mutations
	STREAM_ACTIVE
	//Stream is preparing for recovery(i.e. it has received
	//a control or error message and it is doing a cleanup
	//before initiating Catchup
	STREAM_PREPARE_RECOVERY
	//Prepare is done before recovery
	STREAM_PREPARE_DONE
	//Stream is using a Catchup to recover
	STREAM_RECOVERY
)

func (StreamStatus) String

func (s StreamStatus) String() string

type Timekeeper

type Timekeeper interface {
}

Timekeeper manages the Stability Timestamp Generation and also keeps track of the HWTimestamp for each bucket

type Timestamp

type Timestamp []Seqno

list of seqno per vbucket

func CopyTimestamp

func CopyTimestamp(ts Timestamp) Timestamp

func GetCurrentKVTs

func GetCurrentKVTs(cluster, pooln, bucketn string, numVbs int) (Timestamp, error)

func NewTimestamp

func NewTimestamp(numVbuckets int) Timestamp

func (Timestamp) Equals

func (ts Timestamp) Equals(ts1 Timestamp) bool

Equals returns true if both timestamps match, false otherwise

func (Timestamp) GreaterThan

func (ts Timestamp) GreaterThan(ts1 Timestamp) bool

GreaterThan returns true if the timestamp is greater than given timestamp

func (Timestamp) GreaterThanEqual

func (ts Timestamp) GreaterThanEqual(ts1 Timestamp) bool

GreaterThanEqual returns true if the given timestamp is matching or greater

func (Timestamp) IsZeroTs

func (ts Timestamp) IsZeroTs() bool

IsZeroTs return true if all seqno in TS are zero

func (Timestamp) String

func (ts Timestamp) String() string

type VbStatus

type VbStatus Seqno

func (VbStatus) String

func (v VbStatus) String() string

type Vbucket

type Vbucket uint32

type Vbuuid

type Vbuuid uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL