manager

package
v0.0.0-...-d8c7374 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2017 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright (c) 2014 Couchbase, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	OPCODE_ADD_IDX_DEFN common.OpCode = iota
	OPCODE_DEL_IDX_DEFN
)
View Source
const (
	// generic (0 - 50)
	ERROR_PANIC     errCode = 0
	ERROR_ARGUMENTS         = 1

	// MetadataRepo (51-100)
	ERROR_META_WRONG_KEY          = 51
	ERROR_META_IDX_DEFN_EXIST     = 52
	ERROR_META_IDX_DEFN_NOT_EXIST = 53
	ERROR_META_FAIL_TO_PARSE_INT  = 54

	// Event Manager (101-150)
	ERROR_EVT_DUPLICATE_NOTIFIER = 101

	// Index Manager (151-200)
	ERROR_MGR_DDL_CREATE_IDX = 151
	ERROR_MGR_DDL_DROP_IDX   = 152

	// Coordinator (201-250)
	ERROR_COOR_LISTENER_FAIL = 201
	ERROR_COOR_ELECTION_FAIL = 202

	// Watcher (251 - 300)
	ERROR_WATCH_NO_ADDR_AVAIL = 251

	// Stream (301-350)
	ERROR_STREAM_INVALID_ARGUMENT   = 301
	ERROR_STREAM_NOT_OPEN           = 302
	ERROR_STREAM_REQUEST_ERROR      = 303
	ERROR_STREAM_WRONG_VBUCKET      = 304
	ERROR_STREAM_INVALID_TIMESTAMP  = 305
	ERROR_STREAM_PROJECTOR_TIMEOUT  = 306
	ERROR_STREAM_INVALID_KVADDRS    = 307
	ERROR_STREAM_STREAM_END         = 308
	ERROR_STREAM_FEEDER             = 309
	ERROR_STREAM_INCONSISTENT_VBMAP = 310
)
View Source
const (
	FATAL errSeverity = iota
	NORMAL
)
View Source
const (
	GENERIC errCategory = iota
	COORDINATOR
	INDEX_MANAGER
	METADATA_REPO
	REQUEST_HANDLER
	EVENT_MANAGER
	WATCHER
	STREAM
)
View Source
const (
	RESP_SUCCESS string = "success"
	RESP_ERROR   string = "error"
)
View Source
const CATCHUP_TOPIC = "CATCHUP_STREAM_TOPIC"
View Source
const COORDINATOR_CONFIG_STORE = "IndexCoordinatorConfigStore"

Coordinator

View Source
const COORD_INIT_STREAM_PORT = ":9335"
View Source
const COORD_MAINT_STREAM_PORT = ":9334"

Coordinator

View Source
const COUCHBASE_INTERNAL_BUCKET_URL = "http://localhost:11209/"

Stream Manager

View Source
const DEFAULT_BUCKET_NAME = "default"
View Source
const DEFAULT_EVT_QUEUE_SIZE = 20

Event Manager

View Source
const DEFAULT_NOTIFIER_QUEUE_SIZE = 5
View Source
const DEFAULT_POOL_NAME = "default"
View Source
const HTTP_PREFIX = "http://"

Common

View Source
const INDEX_INSTANCE_ID = "IndexInstanceId"

Index Definition

View Source
const INDEX_PARTITION_ID = "IndexPartitionId"
View Source
const INIT_TOPIC = "INIT_STREAM_TOPIC"
View Source
const KV_DCP_PORT = "11210"
View Source
const KV_DCP_PORT_CLUSTER_RUN = "12000"
View Source
const LOCALHOST = "127.0.0.1"
View Source
const MAINT_TOPIC = "MAINT_STREAM_TOPIC"

Stream Manager

View Source
const MAX_PROJECTOR_RETRY_ELAPSED_TIME = int64(time.Minute) * 5
View Source
const PROJECTOR_PORT = "9999"
View Source
const TESTING = true

/////////////////////////////////////////// Constant for Testing ///////////////////////////////////////////

View Source
const TIMESTAMP_CHANNEL_SIZE = 30
View Source
const TIMESTAMP_HISTORY_COUNT = 10

Timer

View Source
const TIMESTAMP_NOTIFY_CH_SIZE = 100
View Source
const TIMESTAMP_PERSIST_INTERVAL = uint64(time.Minute)

Variables

View Source
var MONITOR_INTERVAL = time.Duration(120000) * time.Millisecond

Stream Monitor (2m)

View Source
var NUM_VB = 1024

Common

View Source
var TIME_INTERVAL = time.Duration(2000) * time.Millisecond

Timer (2s)

View Source
var USE_MASTER_REPO = false

Functions

func GetAllDeletedIndexInstancesId

func GetAllDeletedIndexInstancesId(mgr *IndexManager, buckets []string) ([]uint64, error)

Get all deleted index instance Id's

func GetChangeRecordAsProtoMsg

func GetChangeRecordAsProtoMsg(mgr *IndexManager, changes []*changeRecord, port string) ([]*protobuf.Instance, error)

This function creates a protobuf message for the index instance in the list of change record.

func GetIndexInstanceAsProtoMsg

func GetIndexInstanceAsProtoMsg(mgr *IndexManager,
	bucket string,
	defnId common.IndexDefnId,
	port string) ([]*protobuf.Instance, error)

Get all index instances for a specific defnition as protobuf message

func GetIndexInstancesIdByDefn

func GetIndexInstancesIdByDefn(mgr *IndexManager, bucket string, defnId common.IndexDefnId) ([]uint64, error)

Get all index instance Id's for a specific defnition

func MarshallIndexTopology

func MarshallIndexTopology(topology *IndexTopology) ([]byte, error)

func NewLocalMetadataRepo

func NewLocalMetadataRepo(msgAddr string,
	eventMgr *eventManager,
	reqHandler protocol.CustomRequestHandler,
	repoName string,
	quota uint64) (*MetadataRepo, RequestServer, error)

Types

type BackupResponse

type BackupResponse struct {
	Version uint64               `json:"version,omitempty"`
	Code    string               `json:"code,omitempty"`
	Error   string               `json:"error,omitempty"`
	Result  ClusterIndexMetadata `json:"result,omitempty"`
}

type ClusterIndexMetadata

type ClusterIndexMetadata struct {
	Metadata []LocalIndexMetadata `json:"metadata,omitempty"`
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(repo *MetadataRepo, idxMgr *IndexManager, basepath string) *Coordinator

func (*Coordinator) Abort

func (c *Coordinator) Abort(fid string, reqId uint64, err string) error

func (*Coordinator) AddPendingRequest

func (c *Coordinator) AddPendingRequest(handle *protocol.RequestHandle)

This is called when the leader has de-queued the request for processing.

func (*Coordinator) CleanupOnError

func (c *Coordinator) CleanupOnError()

func (*Coordinator) Commit

func (c *Coordinator) Commit(txid common.Txnid) error

func (*Coordinator) GetAcceptedEpoch

func (c *Coordinator) GetAcceptedEpoch() (uint32, error)

func (*Coordinator) GetCommitedEntries

func (c *Coordinator) GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan protocol.LogEntryMsg, <-chan error, chan<- bool, error)

func (*Coordinator) GetCurrentEpoch

func (c *Coordinator) GetCurrentEpoch() (uint32, error)

func (*Coordinator) GetEnsembleSize

func (c *Coordinator) GetEnsembleSize() uint64

func (*Coordinator) GetFollowerId

func (c *Coordinator) GetFollowerId() string

func (*Coordinator) GetLastCommittedTxid

func (c *Coordinator) GetLastCommittedTxid() (common.Txnid, error)

func (*Coordinator) GetLastLoggedTxid

func (c *Coordinator) GetLastLoggedTxid() (common.Txnid, error)

func (*Coordinator) GetNextTxnId

func (c *Coordinator) GetNextTxnId() common.Txnid

func (*Coordinator) GetQuorumVerifier

func (c *Coordinator) GetQuorumVerifier() protocol.QuorumVerifier

func (*Coordinator) GetRequestChannel

func (c *Coordinator) GetRequestChannel() <-chan *protocol.RequestHandle

Return a channel of request for the leader to process on.

func (*Coordinator) GetStatus

func (c *Coordinator) GetStatus() protocol.PeerStatus

func (*Coordinator) HasQuorum

func (c *Coordinator) HasQuorum(count int) bool

TODO : Quorum should be based on active participants

func (*Coordinator) IsDone

func (s *Coordinator) IsDone() bool

Check if server is terminated

func (*Coordinator) LogAndCommit

func (c *Coordinator) LogAndCommit(txid common.Txnid, op uint32, key string, content []byte, toCommit bool) error

func (*Coordinator) LogProposal

func (c *Coordinator) LogProposal(proposal protocol.ProposalMsg) error

TODO : what to do if createIndex returns error

func (*Coordinator) NewRequest

func (s *Coordinator) NewRequest(opCode uint32, key string, content []byte) bool

Handle a new request. This function will block until the request is being processed (by returning true) or until the request is being interrupted (by returning false). If request is interrupted, then the request may still be processed by some other nodes. So the outcome of the request is unknown when this function returns false.

func (*Coordinator) NotifyNewAcceptedEpoch

func (c *Coordinator) NotifyNewAcceptedEpoch(epoch uint32) error

func (*Coordinator) NotifyNewCurrentEpoch

func (c *Coordinator) NotifyNewCurrentEpoch(epoch uint32) error

func (*Coordinator) Respond

func (c *Coordinator) Respond(fid string, reqId uint64, err string, content []byte) error

func (*Coordinator) Run

func (s *Coordinator) Run(config string)

Run Coordinator

func (*Coordinator) Terminate

func (s *Coordinator) Terminate()

Terminate the Coordinator

type CoordinatorState

type CoordinatorState struct {
	// contains filtered or unexported fields
}

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(code errCode, severity errSeverity, category errCategory, cause error, msg string) Error

func NewError2

func NewError2(code errCode, category errCategory) Error

func NewError3

func NewError3(code errCode, severity errSeverity, category errCategory) Error

func NewError4

func NewError4(code errCode, severity errSeverity, category errCategory, msg string) Error

func (Error) Error

func (e Error) Error() string

type EventType

type EventType byte
const (
	EVENT_NONE EventType = iota
	EVENT_CREATE_INDEX
	EVENT_DROP_INDEX
	EVENT_UPDATE_TOPOLOGY
)

type GlobalTopology

type GlobalTopology struct {
	TopologyKeys []string `json:"topologyKeys,omitempty"`
}

func (*GlobalTopology) AddTopologyKeyIfNecessary

func (g *GlobalTopology) AddTopologyKeyIfNecessary(key string) bool

Add a topology key

func (*GlobalTopology) RemoveTopologyKey

func (g *GlobalTopology) RemoveTopologyKey(key string)

Remove a topology key

type IndexDefnDistribution

type IndexDefnDistribution struct {
	Bucket    string                  `json:"bucket,omitempty"`
	Name      string                  `json:"name,omitempty"`
	DefnId    uint64                  `json:"defnId,omitempty"`
	Instances []IndexInstDistribution `json:"instances,omitempty"`
}

type IndexInstDistribution

type IndexInstDistribution struct {
	InstId         uint64                  `json:"instId,omitempty"`
	State          uint32                  `json:"state,omitempty"`
	StreamId       uint32                  `json:"steamId,omitempty"`
	Error          string                  `json:"error,omitempty"`
	Partitions     []IndexPartDistribution `json:"partitions,omitempty"`
	RState         uint32                  `json:"rRtate,omitempty"`
	Version        uint64                  `json:"version,omitempty"`
	ReplicaId      uint64                  `json:"replicaId,omitempty"`
	Scheduled      bool                    `json:"scheduled,omitempty"`
	StorageMode    string                  `json:"storageMode,omitempty"`
	OldStorageMode string                  `json:"oldStorageMode,omitempty"`
}

type IndexKeyPartDistribution

type IndexKeyPartDistribution struct {
	Keys             []string                      `json:"keys,omitempty"`
	SinglePartitions []IndexSinglePartDistribution `json:"singlePartitions,omitempty"`
}

type IndexManager

type IndexManager struct {
	// contains filtered or unexported fields
}

func NewIndexManager

func NewIndexManager(config common.Config) (mgr *IndexManager, err error)

Create a new IndexManager

func NewIndexManagerInternal

func NewIndexManagerInternal(config common.Config) (mgr *IndexManager, err error)

Create a new IndexManager

func (*IndexManager) CleanupIndex

func (m *IndexManager) CleanupIndex(defnId common.IndexDefnId) error

func (*IndexManager) Close

func (m *IndexManager) Close()

Clean up the IndexManager

func (*IndexManager) DeleteIndexForBucket

func (m *IndexManager) DeleteIndexForBucket(bucket string, streamId common.StreamId) error

func (*IndexManager) DeleteLocalValue

func (m *IndexManager) DeleteLocalValue(key string) error

func (*IndexManager) FetchNewClusterInfoCache

func (m *IndexManager) FetchNewClusterInfoCache() (*common.ClusterInfoCache, error)

func (*IndexManager) GetGlobalTopology

func (m *IndexManager) GetGlobalTopology() (*GlobalTopology, error)

Get the global topology

func (*IndexManager) GetIndexDefnById

func (m *IndexManager) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)

Get an index definiton by id

func (*IndexManager) GetLocalValue

func (m *IndexManager) GetLocalValue(key string) (string, error)

func (*IndexManager) GetMemoryQuota

func (m *IndexManager) GetMemoryQuota() uint64

func (*IndexManager) GetTopologyByBucket

func (m *IndexManager) GetTopologyByBucket(bucket string) (*IndexTopology, error)

Get Topology from dictionary

func (*IndexManager) HandleBuildIndexDDL

func (m *IndexManager) HandleBuildIndexDDL(indexIds client.IndexIdList) error

func (*IndexManager) HandleCreateIndexDDL

func (m *IndexManager) HandleCreateIndexDDL(defn *common.IndexDefn, isRebalReq bool) error

Handle Create Index DDL. This function will block until

  1. The index defn is persisted durably in the dictionary
  2. The index defn is applied locally to each "active" indexer node. An active node is a running node that is in the same network partition as the leader. A leader is always in the majority partition.

This function will return an error if the outcome of the request is not known (e.g. the node is partitioned from the network). It may still mean that the request is able to go through (processed by some other nodes).

A Index DDL can be processed by any node. If this node is a leader, then the DDL request will be processed by the leader. If it is a follower, it will forward the request to the leader.

This function will not be processed until the index manager is either a leader or follower. Therefore, if (1) the node is in the minority partition after network partition or (2) the leader dies, this node will unblock any in-flight request initiated by this node (by returning error). The node will run leader election again. Until this node has became a leader or follower, it will not be able to handle another request.

If this node is partitioned from its leader, it can still recieve updates from the dictionary if this node still connects to it.

func (*IndexManager) HandleDeleteIndexDDL

func (m *IndexManager) HandleDeleteIndexDDL(defnId common.IndexDefnId) error

func (*IndexManager) IsClose

func (m *IndexManager) IsClose() bool

func (*IndexManager) NewIndexDefnIterator

func (m *IndexManager) NewIndexDefnIterator() (*MetaIterator, error)

Get Metadata Iterator for index definition

func (*IndexManager) NotifyConfigUpdate

func (m *IndexManager) NotifyConfigUpdate(config common.Config) error

func (*IndexManager) NotifyIndexerReady

func (m *IndexManager) NotifyIndexerReady() error

func (*IndexManager) NotifyStats

func (m *IndexManager) NotifyStats(stats common.Statistics) error

func (*IndexManager) RegisterNotifier

func (m *IndexManager) RegisterNotifier(notifier MetadataNotifier)

func (*IndexManager) ResetIndex

func (m *IndexManager) ResetIndex(index common.IndexDefn) error

func (*IndexManager) SetLocalValue

func (m *IndexManager) SetLocalValue(key string, value string) error

func (*IndexManager) SetTopologyByBucket

func (m *IndexManager) SetTopologyByBucket(bucket string, topology *IndexTopology) error

Set Topology to dictionary

func (*IndexManager) StartCoordinator

func (mgr *IndexManager) StartCoordinator(config string)

func (*IndexManager) StartListenIndexCreate

func (m *IndexManager) StartListenIndexCreate(id string) (<-chan interface{}, error)

Listen to create Index Request

func (*IndexManager) StartListenIndexDelete

func (m *IndexManager) StartListenIndexDelete(id string) (<-chan interface{}, error)

Listen to delete Index Request

func (*IndexManager) StartListenTopologyUpdate

func (m *IndexManager) StartListenTopologyUpdate(id string) (<-chan interface{}, error)

Listen to update Topology Request

func (*IndexManager) StopListenIndexCreate

func (m *IndexManager) StopListenIndexCreate(id string)

Stop Listen to create Index Request

func (*IndexManager) StopListenIndexDelete

func (m *IndexManager) StopListenIndexDelete(id string)

Stop Listen to delete Index Request

func (*IndexManager) StopListenTopologyUpdate

func (m *IndexManager) StopListenTopologyUpdate(id string)

Stop Listen to update Topology Request

func (*IndexManager) UpdateIndexInstance

func (m *IndexManager) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState,
	streamId common.StreamId, err string, buildTime []uint64, rState common.RebalanceState) error

func (*IndexManager) UpdateIndexInstanceSync

func (m *IndexManager) UpdateIndexInstanceSync(bucket string, defnId common.IndexDefnId, state common.IndexState,
	streamId common.StreamId, err string, buildTime []uint64, rState common.RebalanceState) error

type IndexPartDistribution

type IndexPartDistribution struct {
	PartId          uint64                      `json:"partId,omitempty"`
	SinglePartition IndexSinglePartDistribution `json:"singlePartition,omitempty"`
	KeyPartition    IndexKeyPartDistribution    `json:"keyPartition,omitempty"`
}

type IndexRequest

type IndexRequest struct {
	Version  uint64                 `json:"version,omitempty"`
	Type     RequestType            `json:"type,omitempty"`
	Index    common.IndexDefn       `json:"index,omitempty"`
	IndexIds client.IndexIdList     `json:indexIds,omitempty"`
	Plan     map[string]interface{} `json:plan,omitempty"`
}

type IndexResponse

type IndexResponse struct {
	Version uint64 `json:"version,omitempty"`
	Code    string `json:"code,omitempty"`
	Error   string `json:"error,omitempty"`
}

type IndexSinglePartDistribution

type IndexSinglePartDistribution struct {
	Slices []IndexSliceLocator `json:"slices,omitempty"`
}

type IndexSliceLocator

type IndexSliceLocator struct {
	SliceId   uint64 `json:"sliceId,omitempty"`
	State     uint32 `json:"state,omitempty"`
	IndexerId string `json:"indexerId,omitempty"`
}

type IndexStatus

type IndexStatus struct {
	DefnId     common.IndexDefnId `json:"defnId,omitempty"`
	Name       string             `json:"name,omitempty"`
	Bucket     string             `json:"bucket,omitempty"`
	IsPrimary  bool               `json:"isPrimary,omitempty"`
	SecExprs   []string           `json:"secExprs,omitempty"`
	WhereExpr  string             `json:"where,omitempty"`
	IndexType  string             `json:"indexType,omitempty"`
	Status     string             `json:"status,omitempty"`
	Definition string             `json:"definition"`
	Hosts      []string           `json:"hosts,omitempty"`
	Error      string             `json:"error,omitempty"`
	Completion int                `json:"completion"`
	Progress   float64            `json:"progress"`
	Scheduled  bool               `json:"scheduled"`
}

type IndexStatusResponse

type IndexStatusResponse struct {
	Version     uint64        `json:"version,omitempty"`
	Code        string        `json:"code,omitempty"`
	Error       string        `json:"error,omitempty"`
	FailedNodes []string      `json:"failedNodes,omitempty"`
	Status      []IndexStatus `json:"status,omitempty"`
}

type IndexTopology

type IndexTopology struct {
	Version     uint64                  `json:"version,omitempty"`
	Bucket      string                  `json:"bucket,omitempty"`
	Definitions []IndexDefnDistribution `json:"definitions,omitempty"`
}

func GetTopologyAsInstanceProtoMsg

func GetTopologyAsInstanceProtoMsg(mgr *IndexManager,
	bucket string, port string) ([]*protobuf.Instance, *IndexTopology, error)

Get all index instances for the topology as protobuf message

func (*IndexTopology) AddIndexDefinition

func (t *IndexTopology) AddIndexDefinition(bucket string, name string, defnId uint64, instId uint64, state uint32, indexerId string,
	instVersion uint64, rState uint32, replicaId uint64, scheduled bool, storageMode string)

Add an index definition to Topology.

func (*IndexTopology) ChangeStateForIndexInstByDefn

func (t *IndexTopology) ChangeStateForIndexInstByDefn(defnId common.IndexDefnId, fromState, toState common.IndexState)

Update Index Status on instance

func (*IndexTopology) FindIndexDefinition

func (t *IndexTopology) FindIndexDefinition(bucket string, name string) *IndexDefnDistribution

Get all index instance Id's for a specific defnition

func (*IndexTopology) FindIndexDefinitionById

func (t *IndexTopology) FindIndexDefinitionById(id common.IndexDefnId) *IndexDefnDistribution

Get all index instance Id's for a specific defnition

func (*IndexTopology) GetIndexInstByDefn

func (t *IndexTopology) GetIndexInstByDefn(defnId common.IndexDefnId) *IndexInstDistribution

Update Index Status on instance

func (*IndexTopology) GetIndexInstancesByDefn

func (t *IndexTopology) GetIndexInstancesByDefn(defnId common.IndexDefnId) []IndexInstDistribution

Update Index Status on instance

func (*IndexTopology) GetRStatusByDefn

func (t *IndexTopology) GetRStatusByDefn(defnId common.IndexDefnId) common.RebalanceState

func (*IndexTopology) GetStatusByDefn

func (t *IndexTopology) GetStatusByDefn(defnId common.IndexDefnId) (common.IndexState, string)

Update Index Status on instance

func (*IndexTopology) RemoveIndexDefinition

func (t *IndexTopology) RemoveIndexDefinition(bucket string, name string)

Remove an index definition to Topology.

func (*IndexTopology) RemoveIndexDefinitionById

func (t *IndexTopology) RemoveIndexDefinitionById(id common.IndexDefnId)

func (*IndexTopology) SetErrorForIndexInstByDefn

func (t *IndexTopology) SetErrorForIndexInstByDefn(defnId common.IndexDefnId, errorStr string) bool

Set Error on instance

func (*IndexTopology) UpdateOldStorageModeForIndexInstByDefn

func (t *IndexTopology) UpdateOldStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool

Update Old Storage Mode on instance

func (*IndexTopology) UpdateRebalanceStateForIndexInstByDefn

func (t *IndexTopology) UpdateRebalanceStateForIndexInstByDefn(defnId common.IndexDefnId, state common.RebalanceState) bool

Update Index Rebalance Status on instance

func (*IndexTopology) UpdateScheduledFlagForIndexInstByDefn

func (t *IndexTopology) UpdateScheduledFlagForIndexInstByDefn(defnId common.IndexDefnId, scheduled bool) bool

Set scheduled flag

func (*IndexTopology) UpdateStateForIndexInstByDefn

func (t *IndexTopology) UpdateStateForIndexInstByDefn(defnId common.IndexDefnId, state common.IndexState) bool

Update Index Status on instance

func (*IndexTopology) UpdateStorageModeForIndexInstByDefn

func (t *IndexTopology) UpdateStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool

Update Storage Mode on instance

func (*IndexTopology) UpdateStreamForIndexInstByDefn

func (t *IndexTopology) UpdateStreamForIndexInstByDefn(defnId common.IndexDefnId, stream common.StreamId) bool

Update StreamId on instance

type LifecycleMgr

type LifecycleMgr struct {
	// contains filtered or unexported fields
}

func NewLifecycleMgr

func NewLifecycleMgr(notifier MetadataNotifier, clusterURL string) (*LifecycleMgr, error)

func (*LifecycleMgr) BuildIndexes

func (m *LifecycleMgr) BuildIndexes(ids []common.IndexDefnId,
	reqCtx *common.MetadataRequestContext, retry bool) ([]*common.IndexDefn, []common.IndexDefnId, []error)

func (*LifecycleMgr) CreateIndex

func (m *LifecycleMgr) CreateIndex(defn *common.IndexDefn, scheduled bool,
	reqCtx *common.MetadataRequestContext) error

func (*LifecycleMgr) DeleteIndex

func (m *LifecycleMgr) DeleteIndex(id common.IndexDefnId, notify bool,
	reqCtx *common.MetadataRequestContext) error

func (*LifecycleMgr) FindLocalIndexInst

func (m *LifecycleMgr) FindLocalIndexInst(bucket string, defnId common.IndexDefnId) (*IndexInstDistribution, error)

func (*LifecycleMgr) GetResponseChannel

func (m *LifecycleMgr) GetResponseChannel() <-chan c.Packet

func (*LifecycleMgr) OnNewRequest

func (m *LifecycleMgr) OnNewRequest(fid string, request protocol.RequestMsg)

This is the main event processing loop. It is important not to having any blocking call in this function (e.g. mutex). If this function is blocked, it will also block gometa event processing loop.

func (*LifecycleMgr) RegisterNotifier

func (m *LifecycleMgr) RegisterNotifier(notifier MetadataNotifier)

func (*LifecycleMgr) Run

func (m *LifecycleMgr) Run(repo *MetadataRepo, requestServer RequestServer)

func (*LifecycleMgr) SetScheduledFlag

func (m *LifecycleMgr) SetScheduledFlag(bucket string, defnId common.IndexDefnId, scheduled bool) error

func (*LifecycleMgr) Terminate

func (m *LifecycleMgr) Terminate()

func (*LifecycleMgr) UpdateIndexInstance

func (m *LifecycleMgr) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState,
	streamId common.StreamId, errStr string, buildTime []uint64, rState uint32) error

type LocalIndexMetadata

type LocalIndexMetadata struct {
	IndexerId        string             `json:"indexerId,omitempty"`
	NodeUUID         string             `json:"nodeUUID,omitempty"`
	StorageMode      string             `json:"storageMode,omitempty"`
	IndexTopologies  []IndexTopology    `json:"topologies,omitempty"`
	IndexDefinitions []common.IndexDefn `json:"definitions,omitempty"`
}

type LocalRepoRef

type LocalRepoRef struct {
	// contains filtered or unexported fields
}

type MetaIterator

type MetaIterator struct {
	// contains filtered or unexported fields
}

func (*MetaIterator) Close

func (i *MetaIterator) Close()

close iterator

func (*MetaIterator) Next

func (i *MetaIterator) Next() (string, *common.IndexDefn, error)

Get value from iterator

type MetadataKind

type MetadataKind byte
const (
	KIND_UNKNOWN MetadataKind = iota
	KIND_INDEX_DEFN
	KIND_TOPOLOGY
	KIND_GLOBAL_TOPOLOGY
	KIND_STABILITY_TIMESTAMP
)

type MetadataNotifier

type MetadataNotifier interface {
	OnIndexCreate(*common.IndexDefn, common.IndexInstId, int, *common.MetadataRequestContext) error
	OnIndexDelete(common.IndexInstId, string, *common.MetadataRequestContext) error
	OnIndexBuild([]common.IndexInstId, []string, *common.MetadataRequestContext) map[common.IndexInstId]error
	OnFetchStats() error
}

Index Lifecycle

  1. Index Creation A) When an index is created, the index definition is assigned to a 64 bits UUID (IndexDefnId). B) IndexManager will persist the index definition. C) IndexManager will persist the index instance with INDEX_STATE_CREATED status. Each instance is assigned a 64 bits IndexInstId. For the first instance of an index, the IndexInstId is equal to the IndexDefnId. D) IndexManager will invovke MetadataNotifier.OnIndexCreate(). E) IndexManager will update instance to status INDEX_STATE_READY. F) If there is any error in (1B) - (1E), IndexManager will cleanup by deleting index definition and index instance. Since there is no atomic transaction, cleanup may not be completed, and the index will be left in an invalid state. See (5) for conditions where the index is considered valid. G) If there is any error in (1E), IndexManager will also invoke OnIndexDelete() H) Any error from (1A) or (1F), the error will be reported back to MetadataProvider.
  1. Immediate Index Build (index definition is persisted successfully and deferred build flag is false) A) MetadataNotifier.OnIndexBuild() is invoked. OnIndexBuild() is responsible for updating the state of the index instance (e.g. from READY to INITIAL). B) If there is an error in (2A), the error will be returned to the MetadataProvider. C) No cleanup will be perfromed by IndexManager if OnIndexBuild() fails. In other words, the index can be left in INDEX_STATE_READY. The user should be able to kick off index build again using deferred build. D) OnIndexBuild() can be running on a separate go-rountine. It can invoke UpdateIndexInstance() at any time during index build. This update will be queued serially and apply to the topology specific for that index instance (will not affect any other index instance). The new index state will be returned to the MetadataProvider asynchronously.
  1. Deferred Index Build A) For Deferred Index Build, it will follow step (2A) - (2D).
  1. Index Deletion A) When an index is deleted, IndexManager will set the index to INDEX_STATE_DELETED. B) If (4A) fails, the error will be returned and the index is considered as NOT deleted. C) IndexManager will then invoke MetadataNotifier.OnIndexDelete(). D) The IndexManager will delete the index definition first before deleting the index instance. since there is no atomic transaction, the cleanup may not be completed, and index can be in inconsistent state. See (5) for valid index state. E) Any error returned from (4C) to (4D) will not be returned to the client (since these are cleanup steps)
  1. Valid Index States A) Both index definition and index instance exist. B) Index Instance is not in INDEX_STATE_CREATE or INDEX_STATE_DELETED.

type MetadataRepo

type MetadataRepo struct {
	// contains filtered or unexported fields
}

func NewMetadataRepo

func NewMetadataRepo(requestAddr string,
	leaderAddr string,
	config string,
	mgr *IndexManager) (*MetadataRepo, error)

func (*MetadataRepo) BroadcastIndexStats

func (c *MetadataRepo) BroadcastIndexStats(stats *client.IndexStats) error

func (*MetadataRepo) BroadcastServiceMap

func (c *MetadataRepo) BroadcastServiceMap(serviceMap *client.ServiceMap) error

func (*MetadataRepo) Close

func (c *MetadataRepo) Close()

func (*MetadataRepo) CreateIndex

func (c *MetadataRepo) CreateIndex(defn *common.IndexDefn) error

TODO: This function is not transactional.

func (*MetadataRepo) DeleteLocalValue

func (c *MetadataRepo) DeleteLocalValue(key string) error

func (*MetadataRepo) DropIndexById

func (c *MetadataRepo) DropIndexById(id common.IndexDefnId) error

func (*MetadataRepo) GetGlobalTopology

func (c *MetadataRepo) GetGlobalTopology() (*GlobalTopology, error)

func (*MetadataRepo) GetIndexDefnById

func (c *MetadataRepo) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)

func (*MetadataRepo) GetIndexDefnByName

func (c *MetadataRepo) GetIndexDefnByName(bucket string, name string) (*common.IndexDefn, error)

func (*MetadataRepo) GetLocalIndexerId

func (c *MetadataRepo) GetLocalIndexerId() (common.IndexerId, error)

func (*MetadataRepo) GetLocalNodeUUID

func (c *MetadataRepo) GetLocalNodeUUID() (string, error)

func (*MetadataRepo) GetLocalValue

func (c *MetadataRepo) GetLocalValue(key string) (string, error)

func (*MetadataRepo) GetNextIndexInstId

func (c *MetadataRepo) GetNextIndexInstId() (common.IndexInstId, error)

func (*MetadataRepo) GetNextPartitionId

func (c *MetadataRepo) GetNextPartitionId() (common.PartitionId, error)

func (*MetadataRepo) GetTopologyByBucket

func (c *MetadataRepo) GetTopologyByBucket(bucket string) (*IndexTopology, error)

func (*MetadataRepo) NewIterator

func (c *MetadataRepo) NewIterator() (*MetaIterator, error)

Create a new iterator

func (*MetadataRepo) NewTopologyIterator

func (c *MetadataRepo) NewTopologyIterator() (*TopologyIterator, error)

Create a new topology iterator

func (*MetadataRepo) RegisterNotifier

func (c *MetadataRepo) RegisterNotifier(notifier MetadataNotifier)

func (*MetadataRepo) SetGlobalTopology

func (c *MetadataRepo) SetGlobalTopology(topology *GlobalTopology) error

func (*MetadataRepo) SetLocalValue

func (c *MetadataRepo) SetLocalValue(key string, value string) error

func (*MetadataRepo) SetTopologyByBucket

func (c *MetadataRepo) SetTopologyByBucket(bucket string, topology *IndexTopology) error

func (*MetadataRepo) UpdateIndex

func (c *MetadataRepo) UpdateIndex(defn *common.IndexDefn) error

type RemoteRepoRef

type RemoteRepoRef struct {
	// contains filtered or unexported fields
}

type Reply

type Reply struct {
	Result []byte
}

type RepoRef

type RepoRef interface {
	// contains filtered or unexported methods
}

type Request

type Request struct {
	OpCode string
	Key    string
	Value  []byte
}

type RequestServer

type RequestServer interface {
	MakeRequest(opCode gometaC.OpCode, key string, value []byte) error
	MakeAsyncRequest(opCode gometaC.OpCode, key string, value []byte) error
}

type RequestType

type RequestType string
const (
	CREATE RequestType = "create"
	DROP   RequestType = "drop"
	BUILD  RequestType = "build"
)

type RestoreContext

type RestoreContext struct {
	// contains filtered or unexported fields
}

type RestoreResponse

type RestoreResponse struct {
	Version uint64 `json:"version,omitempty"`
	Code    string `json:"code,omitempty"`
	Error   string `json:"error,omitempty"`
}

type TopologyIterator

type TopologyIterator struct {
	// contains filtered or unexported fields
}

func (*TopologyIterator) Close

func (i *TopologyIterator) Close()

close iterator

func (*TopologyIterator) Next

func (i *TopologyIterator) Next() (*IndexTopology, error)

Get value from iterator

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL