cbgt

package module
v0.0.0-...-14425eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2017 License: Apache-2.0 Imports: 33 Imported by: 0

README

cbgt

The cbgt project provides a golang library that helps manage distributed partitions (or data shards) across an elastic cluster of servers.

Build Status Coverage Status

Documentation
  • GoDoc
  • REST API Reference - these REST API Reference docs come from cbft, which uses the cbgt library.
  • UI Screenshots - these screenshots come from cbft, which uses the cbgt library.

NOTE: This library initializes math's random seed (rand.Seed(time.Now().UTC().UnixNano())) for unique id generation.

Documentation

Index

Constants

View Source
const (
	CFG_CAS_FORCE = math.MaxUint64
)
View Source
const DEST_EXTRAS_TYPE_DCP = DestExtrasType(0x0002)

DEST_EXTRAS_TYPE_DCP represents the extras that comes from DCP protocol.

View Source
const DEST_EXTRAS_TYPE_NIL = DestExtrasType(0)

DEST_EXTRAS_TYPE_NIL means there are no extras as part of a Dest.DataUpdate/DataDelete invocation.

View Source
const DEST_EXTRAS_TYPE_TAP = DestExtrasType(0x0001)

DEST_EXTRAS_TYPE_TAP represents the extras that comes from TAP protocol.

View Source
const FEED_BACKOFF_FACTOR = 1.5
View Source
const FEED_SLEEP_INIT_MS = 100
View Source
const FEED_SLEEP_MAX_MS = 10000

Default values for feed parameters.

View Source
const FILES_FEED_BACKOFF_FACTOR = 1.5
View Source
const FILES_FEED_MAX_SLEEP_MS = 1000 * 60 * 5 // 5 minutes.
View Source
const FILES_FEED_SLEEP_START_MS = 5000
View Source
const FeedAllotmentOnePerPIndex = "oneFeedPerPIndex"

FeedAllotmentOnePerPIndex specifies that there should be only a single feed per pindex.

View Source
const FeedAllotmentOption = "feedAllotment"

FeedAllotmentOption is the manager option key used the specify how feeds should be alloted or assigned.

View Source
const INDEX_DEFS_KEY = "indexDefs"

INDEX_DEFS_KEY is the key used for Cfg access.

View Source
const INDEX_NAME_REGEXP = `^[A-Za-z][0-9A-Za-z_\-]*$`

INDEX_NAME_REGEXP is used to validate index definition names.

View Source
const JANITOR_CLOSE_PINDEX = "janitor_close_pindex"
View Source
const JANITOR_LOAD_DATA_DIR = "janitor_load_data_dir"
View Source
const JANITOR_REMOVE_PINDEX = "janitor_remove_pindex"
View Source
const MANAGER_MAX_EVENTS = 10

MANAGER_MAX_EVENTS limits the number of events tracked by a Manager for diagnosis/debugging.

View Source
const NODE_DEFS_KEY = "nodeDefs" // NODE_DEFS_KEY is used for Cfg access.
View Source
const NODE_DEFS_KNOWN = "known" // NODE_DEFS_KNOWN is used for Cfg access.
View Source
const NODE_DEFS_WANTED = "wanted" // NODE_DEFS_WANTED is used for Cfg access.
View Source
const PINDEX_META_FILENAME string = "PINDEX_META"
View Source
const PLAN_PINDEXES_KEY = "planPIndexes"

PLAN_PINDEXES_KEY is used for Cfg access.

View Source
const QUERY_CTL_DEFAULT_TIMEOUT_MS = int64(10000)

QUERY_CTL_DEFAULT_TIMEOUT_MS is the default query timeout.

View Source
const SOURCE_TYPE_COUCHBASE = "couchbase"
View Source
const SOURCE_TYPE_DCP = "dcp"
View Source
const VERSION = "5.0.0"

The cbgt.VERSION tracks persistence versioning (schema/format of persisted data and configuration). The main.VERSION from "git describe" that's part of an executable command, in contrast, is an overall "product" version. For example, we might introduce new UI-only features or fix a UI typo, in which case we'd bump the main.VERSION number; but, if the persisted data/config format was unchanged, then the cbgt.VERSION number should remain unchanged.

NOTE: You *must* update cbgt.VERSION if you change what's stored in the Cfg (such as the JSON/struct definitions or the planning algorithms).

View Source
const VERSION_KEY = "version"
View Source
const WORK_KICK = "kick"
View Source
const WORK_NOOP = ""

Variables

View Source
var CfgMetaKvPrefix = "/cbgt/cfg/"

Prefix of paths stored in metakv, and should be immutable after process init()'ialization.

View Source
var DCPFeedPrefix string

DCPFeedPrefix should be immutable after process init()'ialization.

View Source
var EMPTY_BYTES = []byte{}
View Source
var ErrCouchbaseMismatchedBucketUUID = fmt.Errorf("mismatched-couchbase-bucket-UUID")
View Source
var ErrPIndexQueryTimeout = errors.New("pindex query timeout")

ErrPIndexQueryTimeout may be returned for queries that took too long and timed out.

View Source
var FeedTypes = make(map[string]*FeedType) // Key is sourceType.

FeedTypes is a global registry of available feed types and is initialized on startup. It should be immutable after startup time.

View Source
var JsonCloseBrace = []byte("}")
View Source
var JsonCloseBraceComma = []byte("},")
View Source
var JsonComma = []byte(",")
View Source
var JsonNULL = []byte("null")
View Source
var JsonOpenBrace = []byte("{")
View Source
var MsgRingMaxBufPoolSize = 8

MsgRingMaxSmallBufSize is the max pool size for reused buf's.

View Source
var MsgRingMaxSmallBufSize = 1024

MsgRingMaxSmallBufSize is the cutoff point, in bytes, in which a msg ring categorizes a buf as small versus large for reuse.

View Source
var PINDEX_STORE_MAX_ERRORS = 40

PINDEX_STORE_MAX_ERRORS is the max number of errors that a PIndexStoreStats will track.

View Source
var PIndexImplTypes = make(map[string]*PIndexImplType)

PIndexImplTypes is a global registry of pindex type backends or implementations. It is keyed by indexType and should be treated as immutable/read-only after process init/startup.

View Source
var PlanPIndexFilters = map[string]PlanPIndexFilter{
	"ok":      PlanPIndexNodeOk,
	"canRead": PlanPIndexNodeCanRead,
}

PlanPIndexFilters represent registered PlanPIndexFilter func's, and should only be modified during process init()'ialization.

View Source
var PlannerHooks = map[string]PlannerHook{}

PlannerHooks allows advanced applications to register callbacks into the planning computation, in order to adjust the planning outcome. For example, an advanced application might adjust node weights more dynamically in order to achieve an improved balancing of pindexes across a cluster. It should be modified only during the init()'ialization phase of process startup. See CalcPlan() implementation for more information.

Functions

func AtomicCopyMetrics

func AtomicCopyMetrics(s, r interface{},
	fn func(sv uint64, rv uint64) uint64)

AtomicCopyMetrics copies uint64 metrics from s to r (from source to result), and also applies an optional fn function to each metric. The fn is invoked with metrics from s and r, and can be used to compute additions, subtractions, etc. When fn is nil, AtomicCopyTo defaults to just a straight copier.

func BlanceMap

func BlanceMap(
	planPIndexesForIndex map[string]*PlanPIndex,
	planPIndexes *PlanPIndexes,
) blance.PartitionMap

BlanceMap reconstructs a blance map from an existing plan.

func BlancePartitionModel

func BlancePartitionModel(indexDef *IndexDef) (
	model blance.PartitionModel,
	modelConstraints map[string]int,
)

BlancePartitionModel returns a blance library PartitionModel and model constraints based on an input index definition.

func BlancePlanPIndexes

func BlancePlanPIndexes(mode string,
	indexDef *IndexDef,
	planPIndexesForIndex map[string]*PlanPIndex,
	planPIndexesPrev *PlanPIndexes,
	nodeUUIDsAll []string,
	nodeUUIDsToAdd []string,
	nodeUUIDsToRemove []string,
	nodeWeights map[string]int,
	nodeHierarchy map[string]string) []string

BlancePlanPIndexes invokes the blance library's generic PlanNextMap() algorithm to create a new pindex layout plan.

func CBAuth

func CBAuth(sourceName, sourceParams string, options map[string]string) (
	auth couchbase.AuthHandler, err error)

func CBAuthHttpGet

func CBAuthHttpGet(urlStrIn string) (resp *http.Response, err error)

CBAuthHttpGet is a couchbase-specific http.Get(), for use in a cbauth'ed environment.

func CBAuthURL

func CBAuthURL(urlStr string) (string, error)

CBAuthURL rewrites a URL with credentials, for use in a cbauth'ed environment.

func CalcNodesLayout

func CalcNodesLayout(indexDefs *IndexDefs, nodeDefs *NodeDefs,
	planPIndexesPrev *PlanPIndexes) (
	nodeUUIDsAll []string,
	nodeUUIDsToAdd []string,
	nodeUUIDsToRemove []string,
	nodeWeights map[string]int,
	nodeHierarchy map[string]string,
)

CalcNodesLayout computes information about the nodes based on the index definitions, node definitions, and the current plan.

func CalcPIndexesDelta

func CalcPIndexesDelta(mgrUUID string,
	currPIndexes map[string]*PIndex,
	wantedPlanPIndexes *PlanPIndexes) (
	addPlanPIndexes []*PlanPIndex,
	removePIndexes []*PIndex)

Functionally determine the delta of which pindexes need creation and which should be shut down on our local node (mgrUUID).

func CasePlanFrozen

func CasePlanFrozen(indexDef *IndexDef,
	begPlanPIndexes, endPlanPIndexes *PlanPIndexes) bool

CasePlanFrozen returns true if the plan for the indexDef is frozen, in which case it also populates endPlanPIndexes with a clone of the indexDef's plans from begPlanPIndexes.

func CfgNodeDefsKey

func CfgNodeDefsKey(kind string) string

CfgNodeDefsKey returns the Cfg access key for a NodeDef kind.

func CfgRemoveNodeDef

func CfgRemoveNodeDef(cfg Cfg, kind, uuid, version string) error

CfgRemoveNodeDef removes a NodeDef with the given uuid from the Cfg.

func CfgSetIndexDefs

func CfgSetIndexDefs(cfg Cfg, indexDefs *IndexDefs, cas uint64) (uint64, error)

Updates index definitions on a Cfg provider.

func CfgSetNodeDefs

func CfgSetNodeDefs(cfg Cfg, kind string, nodeDefs *NodeDefs,
	cas uint64) (uint64, error)

Updates node definitions on a Cfg provider.

func CfgSetPlanPIndexes

func CfgSetPlanPIndexes(cfg Cfg, planPIndexes *PlanPIndexes, cas uint64) (
	uint64, error)

Updates PlanPIndexes on a Cfg provider.

func CheckVersion

func CheckVersion(cfg Cfg, myVersion string) (bool, error)

Returns true if a given version is modern enough to modify the Cfg. Older versions (which are running with older JSON/struct definitions or planning algorithms) will see false from their CheckVersion()'s.

func ConsistencyWaitDone

func ConsistencyWaitDone(partition string,
	cancelCh <-chan bool,
	doneCh chan error,
	currSeq func() uint64) error

ConsistencyWaitDone() waits for either the cancelCh or doneCh to finish, and provides the partition's seq if it was the cancelCh.

func ConsistencyWaitGroup

func ConsistencyWaitGroup(indexName string,
	consistencyParams *ConsistencyParams, cancelCh <-chan bool,
	localPIndexes []*PIndex,
	addLocalPIndex func(*PIndex) error) error

ConsistencyWaitGroup waits for all the partitions from a group of pindexes to reach a required consistency level.

func ConsistencyWaitPIndex

func ConsistencyWaitPIndex(pindex *PIndex, t ConsistencyWaiter,
	consistencyParams *ConsistencyParams, cancelCh <-chan bool) error

ConsistencyWaitPIndex waits for all the partitions in a pindex to reach the required consistency level.

func ConsistencyWaitPartitions

func ConsistencyWaitPartitions(
	t ConsistencyWaiter,
	partitions map[string]bool,
	consistencyLevel string,
	consistencyVector map[string]uint64,
	cancelCh <-chan bool) error

ConsistencyWaitPartitions waits for the given partitions to reach the required consistency level.

func CouchbaseBucket

func CouchbaseBucket(sourceName, sourceUUID, sourceParams, serverIn string,
	options map[string]string) (*couchbase.Bucket, error)

CouchbaseBucket is a helper function to connect to a couchbase bucket.

func CouchbaseParseSourceName

func CouchbaseParseSourceName(
	serverURLDefault, poolNameDefault, sourceName string) (
	string, string, string)

CouchbaseParseSourceName parses a sourceName, if it's a couchbase REST/HTTP URL, into a server URL, poolName and bucketName. Otherwise, returns the serverURLDefault, poolNameDefault, and treat the sourceName as a bucketName.

func CouchbasePartitionSeqs

func CouchbasePartitionSeqs(sourceType, sourceName, sourceUUID,
	sourceParams, serverIn string,
	options map[string]string) (
	map[string]UUIDSeq, error)

CouchbasePartitionSeqs returns a map keyed by partition/vbucket ID with values of each vbucket's UUID / high_seqno. It implements the FeedPartitionsFunc func signature.

func CouchbasePartitions

func CouchbasePartitions(sourceType, sourceName, sourceUUID, sourceParams,
	serverIn string, options map[string]string) (
	partitions []string, err error)

CouchbasePartitions parses a sourceParams for a couchbase data-source/feed.

func CouchbaseSourceVBucketLookUp

func CouchbaseSourceVBucketLookUp(docID, serverIn string,
	sourceDetails *IndexDef, req *http.Request) (string, error)

---------------------------------------------------------------- CouchbaseSourceVBucketLookUp looks up the source vBucketID for a given document ID and index.

func CouchbaseStats

func CouchbaseStats(sourceType, sourceName, sourceUUID,
	sourceParams, serverIn string,
	options map[string]string, statsKind string) (
	map[string]interface{}, error)

CouchbaseStats returns a map of aggregated ("aggStats") and per-node stats ("nodesStats"). It implements the FeedStatsFunc func signature.

func DataSourcePartitions

func DataSourcePartitions(sourceType, sourceName, sourceUUID, sourceParams,
	server string, options map[string]string) ([]string, error)

DataSourcePartitions is a helper function that returns the data source partitions for a named data source or feed type.

func DataSourcePrepParams

func DataSourcePrepParams(sourceType, sourceName, sourceUUID, sourceParams,
	server string, options map[string]string) (string, error)

DataSourcePrepParams parses and validates the sourceParams, possibly transforming it. One transform is if the "markPartitionSeqs" field in the sourceParams has a string value of "currentPartitionSeqs", then the markPartitionSeqs will be transformed into a map[string]UUIDSeq. DataSourcePrepParams returns the transformed sourceParams.

func DefaultMaxPartitionsPerPIndex

func DefaultMaxPartitionsPerPIndex(mgr *Manager) int

DefaultMaxPartitionsPerPIndex retrieves the defaultMaxPartitionsPerPIndex from the manager options, if available.

func ErrorToString

func ErrorToString(e error) string

ErrorToString is a helper func that returns e.Error(), but also returns "" for nil error.

func ExponentialBackoffLoop

func ExponentialBackoffLoop(name string,
	f func() int,
	startSleepMS int,
	backoffFactor float32,
	maxSleepMS int)

Calls f() in a loop, sleeping in an exponential backoff if needed. The provided f() function should return < 0 to stop the loop; >= 0 to continue the loop, where > 0 means there was progress which allows an immediate retry of f() with no sleeping. A return of < 0 is useful when f() will never make any future progress.

func FeedNameForPIndex

func FeedNameForPIndex(pindex *PIndex, feedAllotment string) string

FeedName functionally computes the name of a feed given a pindex.

func FilesFeedPartitions

func FilesFeedPartitions(sourceType, sourceName, sourceUUID, sourceParams,
	server string, options map[string]string) ([]string, error)

FilesFeedPartitions returns the partitions, controlled by FilesFeedParams.NumPartitions, for a FilesFeed instance.

func FilesFindMatches

func FilesFindMatches(dataDir, sourceName string,
	regExps []string, modTimeGTE time.Time, maxSize int64) (
	[]string, error)

FilesFindMatches finds all leaf file paths in a subdirectory tree that match any in an optional array of regExps (regular expression strings). If regExps is nil, though, then all leaf file paths are considered as a potential candidate match. The regExps are with respect to a path from filepath.Walk().

Additionally, a candidate file must have been modified since a modTimeGTE and (if maxSize is > 0) should have size that's <= maxSize.

func FilesPathToPartition

func FilesPathToPartition(h hash.Hash32,
	partitions []string, path string) string

FilesPathToPartition hashes a file path to a partition.

func GetIndexDef

func GetIndexDef(cfg Cfg, indexName string) (
	*IndexDef, *PIndexImplType, error)

GetIndexDef retrieves the IndexDef and PIndexImplType for an index.

func IndentJSON

func IndentJSON(x interface{}, prefix, indent string) string

IndentJSON is a helper func that returns indented JSON for its interface{} x parameter.

func NewBlackHolePIndexImpl

func NewBlackHolePIndexImpl(indexType, indexParams,
	path string, restart func()) (PIndexImpl, Dest, error)

func NewPIndexImpl

func NewPIndexImpl(indexType, indexParams, path string, restart func()) (
	PIndexImpl, Dest, error)

NewPIndexImpl creates an index partition of the given, registered index type.

func NewUUID

func NewUUID() string

func OpenBlackHolePIndexImpl

func OpenBlackHolePIndexImpl(indexType, path string, restart func()) (
	PIndexImpl, Dest, error)

func OpenPIndexImpl

func OpenPIndexImpl(indexType, path string, restart func()) (
	PIndexImpl, Dest, error)

OpenPIndexImpl loads an index partition of the given, registered index type from a given path.

func PIndexMatchesPlan

func PIndexMatchesPlan(pindex *PIndex, planPIndex *PlanPIndex) bool

Returns true if both the PIndex meets the PlanPIndex, ignoring UUID.

func PIndexPath

func PIndexPath(dataDir, pindexName string) string

Computes the storage path for a pindex.

func ParseOpaqueToUUID

func ParseOpaqueToUUID(b []byte) string

func ParsePIndexPath

func ParsePIndexPath(dataDir, pindexPath string) (string, bool)

Retrieves a pindex name from a pindex path.

func ParsePartitionsToVBucketIds

func ParsePartitionsToVBucketIds(dests map[string]Dest) ([]uint16, error)

ParsePartitionsToVBucketIds is specific to couchbase data-sources/feeds, converting a set of partition strings from a dests map to vbucketId numbers.

func Plan

func Plan(cfg Cfg, version, uuid, server string, options map[string]string,
	plannerFilter PlannerFilter) (bool, error)

Plan runs the planner once.

func PlanPIndexName

func PlanPIndexName(indexDef *IndexDef, sourcePartitions string) string

NOTE: PlanPIndex.Name must be unique across the cluster and ideally functionally based off of the indexDef so that the SamePlanPIndex() comparison works even if concurrent planners are racing to calculate plans.

NOTE: We can't use sourcePartitions directly as part of a PlanPIndex.Name suffix because in vbucket/hash partitioning the string would be too long -- since PIndexes might use PlanPIndex.Name for filesystem paths.

func PlanPIndexNodeCanRead

func PlanPIndexNodeCanRead(p *PlanPIndexNode) bool

PlanPIndexNodeCanRead returns true if PlanPIndexNode.CanRead is true; it's useful as a filter arg for Manager.CoveringPIndexes().

func PlanPIndexNodeCanWrite

func PlanPIndexNodeCanWrite(p *PlanPIndexNode) bool

PlanPIndexNodeCanWrite returns true if PlanPIndexNode.CanWrite is true; it's useful as a filter arg for Manager.CoveringPIndexes().

func PlanPIndexNodeOk

func PlanPIndexNodeOk(p *PlanPIndexNode) bool

PlanPIndexNodeOk always returns true; it's useful as a filter arg for Manager.CoveringPIndexes().

func PlannerCheckVersion

func PlannerCheckVersion(cfg Cfg, version string) error

PlannerCheckVersion errors if a version string is too low.

func PlannerGetPlan

func PlannerGetPlan(cfg Cfg, version string, uuid string) (
	indexDefs *IndexDefs,
	nodeDefs *NodeDefs,
	planPIndexes *PlanPIndexes,
	planPIndexesCAS uint64,
	err error)

PlannerGetPlan retrieves plan related info from the Cfg.

func PrimaryFeedPartitions

func PrimaryFeedPartitions(sourceType, sourceName, sourceUUID, sourceParams,
	server string, options map[string]string) ([]string, error)

PrimaryFeedPartitions generates partition strings based on a PrimarySourceParams.NumPartitions parameter.

func RegisterFeedType

func RegisterFeedType(sourceType string, f *FeedType)

RegisterFeedType is invoked at init/startup time to register a FeedType.

func RegisterPIndexImplType

func RegisterPIndexImplType(indexType string, t *PIndexImplType)

RegisterPIndexImplType registers a index type into the system.

func SamePlanPIndex

func SamePlanPIndex(a, b *PlanPIndex) bool

Returns true if both PlanPIndex are the same, ignoring PlanPIndex.UUID.

func SamePlanPIndexes

func SamePlanPIndexes(a, b *PlanPIndexes) bool

Returns true if both PlanPIndexes are the same, where we ignore any differences in UUID or ImplVersion.

func SplitIndexDefIntoPlanPIndexes

func SplitIndexDefIntoPlanPIndexes(indexDef *IndexDef, server string,
	options map[string]string, planPIndexesOut *PlanPIndexes) (
	map[string]*PlanPIndex, error)

Split an IndexDef into 1 or more PlanPIndex'es, assigning data source partitions from the IndexDef to a PlanPIndex based on modulus of MaxPartitionsPerPIndex.

NOTE: If MaxPartitionsPerPIndex isn't a clean divisor of the total number of data source partitions (like 1024 split into clumps of 10), then one PIndex assigned to the remainder will be smaller than the other PIndexes (such as having only a remainder of 4 partitions rather than the usual 10 partitions per PIndex).

func StartDCPFeed

func StartDCPFeed(mgr *Manager, feedName, indexName, indexUUID,
	sourceType, sourceName, bucketUUID, params string,
	dests map[string]Dest) error

StartDCPFeed starts a DCP related feed and is registered at init/startup time with the system via RegisterFeedType().

func StartFilesFeed

func StartFilesFeed(mgr *Manager, feedName, indexName, indexUUID,
	sourceType, sourceName, sourceUUID, params string,
	dests map[string]Dest) error

StartFilesFeed starts a FilesFeed and is the the callback function registered at init/startup time.

func StartTAPFeed

func StartTAPFeed(mgr *Manager, feedName, indexName, indexUUID,
	sourceType, sourceName, bucketUUID, params string,
	dests map[string]Dest) error

StartDCPFeed starts a TAP related feed and is registered at init/startup time with the system via RegisterFeedType().

func StringsIntersectStrings

func StringsIntersectStrings(a, b []string) []string

StringsIntersectStrings returns a brand new array that has the intersection of a and b.

func StringsRemoveStrings

func StringsRemoveStrings(stringArr, removeArr []string) []string

StringsRemoveStrings returns a copy of stringArr, but with some strings removed, keeping the same order as stringArr.

func StringsToMap

func StringsToMap(strsArr []string) map[string]bool

StringsToMap connverts an array of (perhaps duplicated) strings into a map with key of those strings and values of true, and is useful for simple set-like operations.

func StructChanges

func StructChanges(a1, a2 interface{}) (rv []string)

StructChanges uses reflection to compare the fields of two structs, which must the same type, and returns info on the changes of field values.

func SubsetPlanPIndexes

func SubsetPlanPIndexes(a, b *PlanPIndexes) bool

Returns true if PlanPIndex children in a are a subset of those in b, using SamePlanPIndex() for sameness comparion.

func Time

func Time(f func() error,
	totalDuration, totalCount, maxDuration *uint64) error

Time invokes a func f and updates the totalDuration, totalCount and maxDuration metrics. See also Timer() for a metrics based alternative.

func TimeoutCancelChan

func TimeoutCancelChan(timeout int64) <-chan bool

TimeoutCancelChan creates a channel that closes after a given timeout in milliseconds.

func Timer

func Timer(f func() error, t metrics.Timer) error

Timer updates a metrics.Timer. Unlike metrics.Timer.Time(), this version also captures any error return value.

func UnregisterNodes

func UnregisterNodes(cfg Cfg, version string, nodeUUIDs []string) error

UnregisterNodes removes the given nodes (by their UUID) from the nodes wanted & known cfg entries.

func UnregisterNodesWithRetries

func UnregisterNodesWithRetries(cfg Cfg, version string, nodeUUIDs []string,
	maxTries int) error

UnregisterNodesWithRetries removes the given nodes (by their UUID) from the nodes wanted & known cfg entries, and performs retries a max number of times if there were CAS conflict errors.

func VersionGTE

func VersionGTE(x, y string) bool

Compares two dotted versioning strings, like "1.0.1" and "1.2.3". Returns true when x >= y.

TODO: Need to handle non-numeric parts?

func WriteTimerJSON

func WriteTimerJSON(w io.Writer, timer metrics.Timer)

WriteTimerJSON writes a metrics.Timer instance as JSON to a io.Writer.

Types

type BlackHole

type BlackHole struct {
	// contains filtered or unexported fields
}

Implements both Dest and PIndexImpl interfaces.

func (*BlackHole) Close

func (t *BlackHole) Close() error

func (*BlackHole) ConsistencyWait

func (t *BlackHole) ConsistencyWait(partition, partitionUUID string,
	consistencyLevel string,
	consistencySeq uint64,
	cancelCh <-chan bool) error

func (*BlackHole) Count

func (t *BlackHole) Count(pindex *PIndex,
	cancelCh <-chan bool) (uint64, error)

func (*BlackHole) DataDelete

func (t *BlackHole) DataDelete(partition string,
	key []byte, seq uint64,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*BlackHole) DataUpdate

func (t *BlackHole) DataUpdate(partition string,
	key []byte, seq uint64, val []byte,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*BlackHole) OpaqueGet

func (t *BlackHole) OpaqueGet(partition string) (
	value []byte, lastSeq uint64, err error)

func (*BlackHole) OpaqueSet

func (t *BlackHole) OpaqueSet(partition string, value []byte) error

func (*BlackHole) Query

func (t *BlackHole) Query(pindex *PIndex, req []byte, w io.Writer,
	cancelCh <-chan bool) error

func (*BlackHole) Rollback

func (t *BlackHole) Rollback(partition string, rollbackSeq uint64) error

func (*BlackHole) SnapshotStart

func (t *BlackHole) SnapshotStart(partition string,
	snapStart, snapEnd uint64) error

func (*BlackHole) Stats

func (t *BlackHole) Stats(w io.Writer) error

type CBAuthParams

type CBAuthParams struct {
	AuthUser     string `json:"authUser"` // May be "" for no auth.
	AuthPassword string `json:"authPassword"`

	AuthSaslUser     string `json:"authSaslUser"` // May be "" for no auth.
	AuthSaslPassword string `json:"authSaslPassword"`
}

CBAuthParams are common couchbase data-source/feed specific connection parameters that may be part of a sourceParams JSON.

func (*CBAuthParams) GetCredentials

func (d *CBAuthParams) GetCredentials() (string, string, string)

type CBAuthParamsSasl

type CBAuthParamsSasl struct {
	CBAuthParams
}

CBAuthParamsSasl implements the cbdatasource.ServerCredProvider interface.

func (*CBAuthParamsSasl) GetSaslCredentials

func (d *CBAuthParamsSasl) GetSaslCredentials() (string, string)

type Cfg

type Cfg interface {
	// Get retrieves an entry from the Cfg.  A zero cas means don't do
	// a CAS match on Get(), and a non-zero cas value means the Get()
	// will succeed only if the CAS matches.
	Get(key string, cas uint64) (val []byte, casSuccess uint64, err error)

	// Set creates or updates an entry in the Cfg.  A non-zero cas
	// that does not match will result in an error.  A zero cas means
	// the Set() operation must be an entry creation, where a zero cas
	// Set() will error if the entry already exists.
	Set(key string, val []byte, cas uint64) (casSuccess uint64, err error)

	// Del removes an entry from the Cfg.  A non-zero cas that does
	// not match will result in an error.  A zero cas means a CAS
	// match will be skipped, so that clients can perform a
	// "don't-care, out-of-the-blue" deletion.
	Del(key string, cas uint64) error

	// Subscribe allows clients to receive events on changes to a key.
	// During a deletion event, the CfgEvent.CAS field will be 0.
	Subscribe(key string, ch chan CfgEvent) error

	// Refresh forces the Cfg implementation to reload from its
	// backend-specific data source, clearing any locally cached data.
	// Any subscribers will receive events on a Refresh, where it's up
	// to subscribers to detect if there were actual changes or not.
	Refresh() error
}

Cfg is the interface that configuration providers must implement.

type CfgCASError

type CfgCASError struct{}

The error used on mismatches of CAS (compare and set/swap) values.

func (*CfgCASError) Error

func (e *CfgCASError) Error() string

type CfgCB

type CfgCB struct {
	// contains filtered or unexported fields
}

CfgCB is an implementation of Cfg that uses a couchbase bucket, and uses DCP to get change notifications.

TODO: This current implementation is race-y! Instead of storing everything as a single uber key/value, we should instead be storing individual key/value's on every get/set/del operation.

func NewCfgCB

func NewCfgCB(urlStr, bucket string) (*CfgCB, error)

NewCfgCB returns a Cfg implementation that reads/writes its entries from/to a couchbase bucket, using DCP streams to subscribe to changes.

func NewCfgCBEx

func NewCfgCBEx(urlStr, bucket string,
	options map[string]interface{}) (*CfgCB, error)

NewCfgCBEx is a more advanced version of NewCfgCB(), with more initialization options via the options map. Allowed options: "keyPrefix" - an optional string prefix that's prepended to any keys that are written to or read from the couchbase bucket.

func (*CfgCB) DataDelete

func (r *CfgCB) DataDelete(vbucketId uint16, key []byte, seq uint64,
	req *gomemcached.MCRequest) error

func (*CfgCB) DataUpdate

func (r *CfgCB) DataUpdate(vbucketId uint16, key []byte, seq uint64,
	req *gomemcached.MCRequest) error

func (*CfgCB) Del

func (c *CfgCB) Del(key string, cas uint64) error

func (*CfgCB) FireEvent

func (c *CfgCB) FireEvent(key string, cas uint64, err error)

func (*CfgCB) Get

func (c *CfgCB) Get(key string, cas uint64) (
	[]byte, uint64, error)

func (*CfgCB) GetCredentials

func (a *CfgCB) GetCredentials() (string, string, string)

func (*CfgCB) GetMetaData

func (r *CfgCB) GetMetaData(vbucketId uint16) (
	value []byte, lastSeq uint64, err error)

func (*CfgCB) OnError

func (r *CfgCB) OnError(err error)

func (*CfgCB) Refresh

func (c *CfgCB) Refresh() error

func (*CfgCB) Rollback

func (r *CfgCB) Rollback(vbucketId uint16, rollbackSeq uint64) error

func (*CfgCB) Set

func (c *CfgCB) Set(key string, val []byte, cas uint64) (
	uint64, error)

func (*CfgCB) SetMetaData

func (r *CfgCB) SetMetaData(vbucketId uint16, value []byte) error

func (*CfgCB) SnapshotStart

func (r *CfgCB) SnapshotStart(vbucketId uint16,
	snapStart, snapEnd uint64, snapType uint32) error

func (*CfgCB) Subscribe

func (c *CfgCB) Subscribe(key string, ch chan CfgEvent) error

type CfgEvent

type CfgEvent struct {
	Key   string
	CAS   uint64
	Error error
}

See the Cfg.Subscribe() method.

type CfgMem

type CfgMem struct {
	CASNext uint64
	Entries map[string]*CfgMemEntry
	// contains filtered or unexported fields
}

CfgMem is a local-only, memory-only implementation of Cfg interface that's useful for development and testing.

func NewCfgMem

func NewCfgMem() *CfgMem

NewCfgMem returns an empty CfgMem instance.

func (*CfgMem) Del

func (c *CfgMem) Del(key string, cas uint64) error

func (*CfgMem) FireEvent

func (c *CfgMem) FireEvent(key string, cas uint64, err error)

func (*CfgMem) Get

func (c *CfgMem) Get(key string, cas uint64) (
	[]byte, uint64, error)

func (*CfgMem) GetRev

func (c *CfgMem) GetRev(key string, cas uint64) (
	interface{}, error)

func (*CfgMem) Refresh

func (c *CfgMem) Refresh() error

func (*CfgMem) Set

func (c *CfgMem) Set(key string, val []byte, cas uint64) (
	uint64, error)

func (*CfgMem) SetRev

func (c *CfgMem) SetRev(key string, cas uint64, rev interface{}) error

func (*CfgMem) Subscribe

func (c *CfgMem) Subscribe(key string, ch chan CfgEvent) error

type CfgMemEntry

type CfgMemEntry struct {
	CAS uint64
	Val []byte
	// contains filtered or unexported fields
}

CfgMemEntry is a CAS-Val pairing tracked by CfgMem.

type CfgMetaKv

type CfgMetaKv struct {
	// contains filtered or unexported fields
}

func NewCfgMetaKv

func NewCfgMetaKv(nodeUUID string) (*CfgMetaKv, error)

NewCfgMetaKv returns a CfgMetaKv that reads and stores its single configuration file in the metakv.

func (*CfgMetaKv) Del

func (c *CfgMetaKv) Del(key string, cas uint64) error

func (*CfgMetaKv) Get

func (c *CfgMetaKv) Get(key string, cas uint64) ([]byte, uint64, error)

func (*CfgMetaKv) Load

func (c *CfgMetaKv) Load() error

func (*CfgMetaKv) Refresh

func (c *CfgMetaKv) Refresh() error

func (*CfgMetaKv) RemoveAllKeys

func (c *CfgMetaKv) RemoveAllKeys()

RemoveAllKeys removes all cfg entries from metakv, where the caller should no longer use this CfgMetaKv instance, but instead create a new instance.

func (*CfgMetaKv) Set

func (c *CfgMetaKv) Set(key string, val []byte, cas uint64) (
	uint64, error)

func (*CfgMetaKv) Subscribe

func (c *CfgMetaKv) Subscribe(key string, ch chan CfgEvent) error

type CfgMetaKvEntry

type CfgMetaKvEntry struct {
	// contains filtered or unexported fields
}

type CfgSimple

type CfgSimple struct {
	// contains filtered or unexported fields
}

CfgSimple is a local-only, persisted (in a single file) implementation of the Cfg interface that's useful for non-clustered, single-node instances for developers.

func NewCfgSimple

func NewCfgSimple(path string) *CfgSimple

NewCfgSimple returns a CfgSimple that reads and stores its single configuration file in the provided file path.

func (*CfgSimple) Del

func (c *CfgSimple) Del(key string, cas uint64) error

func (*CfgSimple) Get

func (c *CfgSimple) Get(key string, cas uint64) (
	[]byte, uint64, error)

func (*CfgSimple) Load

func (c *CfgSimple) Load() error

func (*CfgSimple) Refresh

func (c *CfgSimple) Refresh() error

func (*CfgSimple) Set

func (c *CfgSimple) Set(key string, val []byte, cas uint64) (
	uint64, error)

func (*CfgSimple) Subscribe

func (c *CfgSimple) Subscribe(key string, ch chan CfgEvent) error

type ConsistencyParams

type ConsistencyParams struct {
	// A Level value of "" means stale is ok; "at_plus" means we need
	// consistency at least at or beyond the consistency vector but
	// not before.
	Level string `json:"level"`

	// Keyed by indexName.
	Vectors map[string]ConsistencyVector `json:"vectors"`
}

ConsistencyParams represent the consistency requirements of a client's request.

type ConsistencyVector

type ConsistencyVector map[string]uint64

Key is partition or partition/partitionUUID. Value is seq. For example, a DCP data source might have the key as either "vbucketId" or "vbucketId/vbucketUUID".

type ConsistencyWaitReq

type ConsistencyWaitReq struct {
	PartitionUUID    string
	ConsistencyLevel string
	ConsistencySeq   uint64
	CancelCh         <-chan bool
	DoneCh           chan error
}

A ConsistencyWaitReq represents a runtime consistency wait request for a partition.

type ConsistencyWaiter

type ConsistencyWaiter interface {
	ConsistencyWait(partition, partitionUUID string,
		consistencyLevel string,
		consistencySeq uint64,
		cancelCh <-chan bool) error
}

ConsistencyWaiter interface represents a service that can wait for consistency.

type CoveringPIndexes

type CoveringPIndexes struct {
	LocalPIndexes      []*PIndex
	RemotePlanPIndexes []*RemotePlanPIndex
	MissingPIndexNames []string
}

CoveringPIndexes represents a non-overlapping, disjoint set of PIndexes that cover all the partitions of an index.

type CoveringPIndexesSpec

type CoveringPIndexesSpec struct {
	IndexName            string
	IndexUUID            string
	PlanPIndexFilterName string // See PlanPIndexesFilters.
}

CoveringPIndexesSpec represent the arguments for computing the covering pindexes for an index. See also CoveringPIndexesEx().

type CwrQueue

type CwrQueue []*ConsistencyWaitReq

A CwrQueue is a consistency wait request queue, implementing the heap.Interface for ConsistencyWaitReq's, and is heap ordered by sequence number.

func (CwrQueue) Len

func (pq CwrQueue) Len() int

func (CwrQueue) Less

func (pq CwrQueue) Less(i, j int) bool

func (*CwrQueue) Pop

func (pq *CwrQueue) Pop() interface{}

func (*CwrQueue) Push

func (pq *CwrQueue) Push(x interface{})

func (CwrQueue) Swap

func (pq CwrQueue) Swap(i, j int)

type DCPFeed

type DCPFeed struct {
	// contains filtered or unexported fields
}

A DCPFeed implements both Feed and cbdatasource.Receiver interfaces, and forwards any incoming cbdatasource.Receiver callbacks to the relevant, hooked-up Dest instances.

func NewDCPFeed

func NewDCPFeed(name, indexName, url, poolName,
	bucketName, bucketUUID, paramsStr string,
	pf DestPartitionFunc, dests map[string]Dest,
	disable bool, mgr *Manager) (*DCPFeed, error)

NewDCPFeed creates a new, ready-to-be-started DCP feed.

func (*DCPFeed) Close

func (t *DCPFeed) Close() error

func (*DCPFeed) DataDelete

func (r *DCPFeed) DataDelete(vbucketId uint16, key []byte, seq uint64,
	req *gomemcached.MCRequest) error

func (*DCPFeed) DataUpdate

func (r *DCPFeed) DataUpdate(vbucketId uint16, key []byte, seq uint64,
	req *gomemcached.MCRequest) error

func (*DCPFeed) Dests

func (t *DCPFeed) Dests() map[string]Dest

func (*DCPFeed) GetBucketDetails

func (r *DCPFeed) GetBucketDetails() (name, uuid string)

func (*DCPFeed) GetMetaData

func (r *DCPFeed) GetMetaData(vbucketId uint16) (
	value []byte, lastSeq uint64, err error)

func (*DCPFeed) IndexName

func (t *DCPFeed) IndexName() string

func (*DCPFeed) Name

func (t *DCPFeed) Name() string

func (*DCPFeed) OnError

func (r *DCPFeed) OnError(err error)

func (*DCPFeed) Rollback

func (r *DCPFeed) Rollback(vbucketId uint16, rollbackSeq uint64) error

func (*DCPFeed) SetMetaData

func (r *DCPFeed) SetMetaData(vbucketId uint16, value []byte) error

func (*DCPFeed) SnapshotStart

func (r *DCPFeed) SnapshotStart(vbucketId uint16,
	snapStart, snapEnd uint64, snapType uint32) error

func (*DCPFeed) Start

func (t *DCPFeed) Start() error

func (*DCPFeed) Stats

func (t *DCPFeed) Stats(w io.Writer) error

func (*DCPFeed) VerifyBucketNotExists

func (r *DCPFeed) VerifyBucketNotExists() (bool, error)

VerifyBucketNotExists returns true only if it's sure the bucket does not exist anymore (including if UUID's no longer match). A rejected auth or connection failure, for example, results in false.

type DCPFeedParams

type DCPFeedParams struct {
	AuthUser     string `json:"authUser,omitempty"` // May be "" for no auth.
	AuthPassword string `json:"authPassword,omitempty"`

	AuthSaslUser     string `json:"authSaslUser,omitempty"` // May be "" for no auth.
	AuthSaslPassword string `json:"authSaslPassword,omitempty"`

	// Factor (like 1.5) to increase sleep time between retries
	// in connecting to a cluster manager node.
	ClusterManagerBackoffFactor float32 `json:"clusterManagerBackoffFactor,omitempty"`

	// Initial sleep time (millisecs) before first retry to cluster manager.
	ClusterManagerSleepInitMS int `json:"clusterManagerSleepInitMS,omitempty"`

	// Maximum sleep time (millisecs) between retries to cluster manager.
	ClusterManagerSleepMaxMS int `json:"clusterManagerSleepMaxMS,omitempty"`

	// Factor (like 1.5) to increase sleep time between retries
	// in connecting to a data manager node.
	DataManagerBackoffFactor float32 `json:"dataManagerBackoffFactor,omitempty"`

	// Initial sleep time (millisecs) before first retry to data manager.
	DataManagerSleepInitMS int `json:"dataManagerSleepInitMS,omitempty"`

	// Maximum sleep time (millisecs) between retries to data manager.
	DataManagerSleepMaxMS int `json:"dataManagerSleepMaxMS,omitempty"`

	// Buffer size in bytes provided for UPR flow control.
	FeedBufferSizeBytes uint32 `json:"feedBufferSizeBytes,omitempty"`

	// Used for UPR flow control and buffer-ack messages when this
	// percentage of FeedBufferSizeBytes is reached.
	FeedBufferAckThreshold float32 `json:"feedBufferAckThreshold,omitempty"`

	// Used to specify whether the applications are interested
	// in receiving the xattrs information in a dcp stream.
	IncludeXAttrs bool `json:"includeXAttrs,omitempty"`
}

DCPFeedParams are DCP data-source/feed specific connection parameters that may be part of a sourceParams JSON and is a superset of CBAuthParams. DCPFeedParams holds the information used to populate a cbdatasource.BucketDataSourceOptions on calls to cbdatasource.NewBucketDataSource(). DCPFeedParams also implements the couchbase.AuthHandler interface.

func NewDCPFeedParams

func NewDCPFeedParams() *DCPFeedParams

NewDCPFeedParams returns a DCPFeedParams initialized with default values.

type Dest

type Dest interface {
	// Invoked by PIndex.Close().
	Close() error

	// Invoked when there's a new mutation from a data source for a
	// partition.  Dest implementation is responsible for making its
	// own copies of the key, val and extras data.
	DataUpdate(partition string, key []byte, seq uint64, val []byte,
		cas uint64,
		extrasType DestExtrasType, extras []byte) error

	// Invoked by the data source when there's a data deletion in a
	// partition.  Dest implementation is responsible for making its
	// own copies of the key and extras data.
	DataDelete(partition string, key []byte, seq uint64,
		cas uint64,
		extrasType DestExtrasType, extras []byte) error

	// An callback invoked by the data source when there's a start of
	// a new snapshot for a partition.  The Receiver implementation,
	// for example, might choose to optimize persistence perhaps by
	// preparing a batch write to application-specific storage.
	SnapshotStart(partition string, snapStart, snapEnd uint64) error

	// OpaqueGet() should return the opaque value previously
	// provided by an earlier call to OpaqueSet().  If there was no
	// previous call to OpaqueSet(), such as in the case of a brand
	// new instance of a Dest (as opposed to a restarted or reloaded
	// Dest), the Dest should return (nil, 0, nil) for (value,
	// lastSeq, err), respectively.  The lastSeq should be the last
	// sequence number received and persisted during calls to the
	// Dest's DataUpdate() & DataDelete() methods.
	OpaqueGet(partition string) (value []byte, lastSeq uint64, err error)

	// The Dest implementation should persist the value parameter of
	// OpaqueSet() for retrieval during some future call to
	// OpaqueGet() by the system.  The metadata value should be
	// considered "in-stream", or as part of the sequence history of
	// mutations.  That is, a later Rollback() to some previous
	// sequence number for a particular partition should rollback
	// both persisted metadata and regular data.  The Dest
	// implementation should make its own copy of the value data.
	OpaqueSet(partition string, value []byte) error

	// Invoked by when the datasource signals a rollback during dest
	// initialization.  Note that both regular data and opaque data
	// should be rolled back to at a maximum of the rollbackSeq.  Of
	// note, the Dest is allowed to rollback even further, even all
	// the way back to the start or to zero.
	Rollback(partition string, rollbackSeq uint64) error

	// Blocks until the Dest has reached the desired consistency for
	// the partition or until the cancelCh is readable or closed by
	// some goroutine related to the calling goroutine.  The error
	// response might be a ErrorConsistencyWait instance, which has
	// StartEndSeqs information.  The seqStart is the seq number when
	// the operation started waiting and the seqEnd is the seq number
	// at the end of operation (even when cancelled or error), so that
	// the caller might get a rough idea of ingest velocity.
	ConsistencyWait(partition, partitionUUID string,
		consistencyLevel string,
		consistencySeq uint64,
		cancelCh <-chan bool) error

	// Counts the underlying pindex implementation.
	Count(pindex *PIndex, cancelCh <-chan bool) (uint64, error)

	// Queries the underlying pindex implementation, blocking if
	// needed for the Dest to reach the desired consistency.
	Query(pindex *PIndex, req []byte, w io.Writer,
		cancelCh <-chan bool) error

	Stats(io.Writer) error
}

Dest interface defines the data sink or destination for data that cames from a data-source. In other words, a data-source (or a Feed instance) is hooked up to one or more Dest instances. As a Feed receives incoming data, the Feed will invoke methods on its Dest instances.

func BasicPartitionFunc

func BasicPartitionFunc(partition string, key []byte,
	dests map[string]Dest) (Dest, error)

This basic partition func first tries a direct lookup by partition string, else it tries the "" partition.

func VBucketIdToPartitionDest

func VBucketIdToPartitionDest(pf DestPartitionFunc,
	dests map[string]Dest, vbucketId uint16, key []byte) (
	partition string, dest Dest, err error)

VBucketIdToPartitionDest is specific to couchbase data-sources/feeds, choosing the right Dest based on a vbucketId.

type DestExtrasType

type DestExtrasType uint16

DestExtrasType represents the encoding for the Dest.DataUpdate/DataDelete() extras parameter.

type DestForwarder

type DestForwarder struct {
	DestProvider DestProvider
}

A DestForwarder implements the Dest interface by forwarding method calls to the Dest returned by a DestProvider.

It is useful for pindex backend implementations that have their own level-of-indirection features. One example would be pindex backends that track a separate batch per partition.

func (*DestForwarder) Close

func (t *DestForwarder) Close() error

func (*DestForwarder) ConsistencyWait

func (t *DestForwarder) ConsistencyWait(partition, partitionUUID string,
	consistencyLevel string,
	consistencySeq uint64,
	cancelCh <-chan bool) error

func (*DestForwarder) Count

func (t *DestForwarder) Count(pindex *PIndex, cancelCh <-chan bool) (
	uint64, error)

func (*DestForwarder) DataDelete

func (t *DestForwarder) DataDelete(partition string,
	key []byte, seq uint64,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*DestForwarder) DataUpdate

func (t *DestForwarder) DataUpdate(partition string,
	key []byte, seq uint64, val []byte,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*DestForwarder) OpaqueGet

func (t *DestForwarder) OpaqueGet(partition string) (
	value []byte, lastSeq uint64, err error)

func (*DestForwarder) OpaqueSet

func (t *DestForwarder) OpaqueSet(partition string, value []byte) error

func (*DestForwarder) Query

func (t *DestForwarder) Query(pindex *PIndex, req []byte, res io.Writer,
	cancelCh <-chan bool) error

func (*DestForwarder) Rollback

func (t *DestForwarder) Rollback(partition string, rollbackSeq uint64) error

func (*DestForwarder) SnapshotStart

func (t *DestForwarder) SnapshotStart(partition string,
	snapStart, snapEnd uint64) error

func (*DestForwarder) Stats

func (t *DestForwarder) Stats(w io.Writer) error

type DestPartitionFunc

type DestPartitionFunc func(partition string, key []byte,
	dests map[string]Dest) (Dest, error)

A DestPartitionFunc allows a level of indirection/abstraction for the Feed-to-Dest relationship. A Feed is hooked up in a one-to-many relationship with multiple Dest instances. The DestPartitionFunc provided to a Feed instance defines the mapping of which Dest the Feed should invoke when the Feed receives an incoming data item.

The partition parameter is encoded as a string, instead of a uint16 or number, to allow for future range partitioning functionality.

type DestProvider

type DestProvider interface {
	Dest(partition string) (Dest, error)

	Count(pindex *PIndex, cancelCh <-chan bool) (uint64, error)

	Query(pindex *PIndex, req []byte, res io.Writer,
		cancelCh <-chan bool) error

	Stats(io.Writer) error

	Close() error
}

A DestProvider returns the Dest to use for different kinds of operations and is used in conjunction with a DestForwarder.

type DestStats

type DestStats struct {
	TotError uint64

	TimerDataUpdate    metrics.Timer
	TimerDataDelete    metrics.Timer
	TimerSnapshotStart metrics.Timer
	TimerOpaqueGet     metrics.Timer
	TimerOpaqueSet     metrics.Timer
	TimerRollback      metrics.Timer
}

DestStats holds the common stats or metrics for a Dest.

func NewDestStats

func NewDestStats() *DestStats

NewDestStats creates a new, ready-to-use DestStats.

func (*DestStats) WriteJSON

func (d *DestStats) WriteJSON(w io.Writer)

type DiagHandler

type DiagHandler struct {
	Name        string
	Handler     http.Handler
	HandlerFunc http.HandlerFunc
}

DiagHandler allows modules to provide their own additions in response to "diag" or diagnostic information requests.

type Documentation

type Documentation struct {
	Text string      // Optional documentation text (markdown).
	JSON interface{} // Optional marshall'able to JSON.
}

Documentation is used for auto-generated documentation.

type ErrorConsistencyWait

type ErrorConsistencyWait struct {
	Err    error  // The underlying, wrapped error.
	Status string // Short status reason, like "timeout", "cancelled", etc.

	// Keyed by partitionId, value is pair of start/end seq's.
	StartEndSeqs map[string][]uint64
}

An ErrorConsistencyWait represents an error or timeout while waiting for a partition to reach some consistency requirements.

func (*ErrorConsistencyWait) Error

func (e *ErrorConsistencyWait) Error() string

type Feed

type Feed interface {
	Name() string
	IndexName() string
	Start() error
	Close() error
	Dests() map[string]Dest // Key is partition identifier.

	// Writes stats as JSON to the given writer.
	Stats(io.Writer) error
}

A Feed interface represents an abstract data source. A Feed instance is hooked up to one-or-more Dest instances. When incoming data is received by a Feed, the Feed will invoke relvate methods on the relevant Dest instances.

In this codebase, the words "index source", "source" and "data source" are often associated with and used roughly as synonyms with "feed".

func CalcFeedsDelta

func CalcFeedsDelta(nodeUUID string, planPIndexes *PlanPIndexes,
	currFeeds map[string]Feed, pindexes map[string]*PIndex,
	feedAllotment string) (addFeeds [][]*PIndex, removeFeeds []Feed)

Functionally determine the delta of which feeds need creation and which should be shut down. An updated feed would appear on both the removeFeeds and addFeeds outputs, which assumes the caller is going to remove feeds before adding feeds.

type FeedPartitionLookUpFunc

type FeedPartitionLookUpFunc func(docID, server string,
	sourceDetails *IndexDef,
	req *http.Request) (string, error)

Performs a lookup of a source partition given a document id.

type FeedPartitionSeqsFunc

type FeedPartitionSeqsFunc func(sourceType, sourceName, sourceUUID,
	sourceParams, server string,
	options map[string]string) (map[string]UUIDSeq, error)

Returns the current partitions and their seq's.

type FeedPartitionsFunc

type FeedPartitionsFunc func(sourceType, sourceName, sourceUUID,
	sourceParams, server string,
	options map[string]string) ([]string, error)

Each Feed or data-source type knows of the data partitions for a data source.

type FeedStartFunc

type FeedStartFunc func(mgr *Manager,
	feedName, indexName, indexUUID string,
	sourceType, sourceName, sourceUUID, sourceParams string,
	dests map[string]Dest) error

A FeedStartFunc is part of a FeedType registration as is invoked by a Manager when a new feed instance needs to be started.

type FeedStatsFunc

type FeedStatsFunc func(sourceType, sourceName, sourceUUID,
	sourceParams, server string,
	options map[string]string,
	statsKind string) (map[string]interface{}, error)

Returns the current stats from a data source, if available, where the result is dependent on the data source / feed type.

type FeedType

type FeedType struct {
	Start           FeedStartFunc
	Partitions      FeedPartitionsFunc
	PartitionSeqs   FeedPartitionSeqsFunc   // Optional.
	Stats           FeedStatsFunc           // Optional.
	PartitionLookUp FeedPartitionLookUpFunc // Optional.
	Public          bool
	Description     string
	StartSample     interface{}
	StartSampleDocs map[string]string
}

A FeedType represents an immutable registration of a single feed type or data source type.

type FileDoc

type FileDoc struct {
	Name     string `json:"name"`
	Path     string `json:"path"` // Path relative to the source name.
	Contents string `json:"contents"`
}

FileDoc represents the JSON for each file/document that will be emitted by a FilesFeed as a data source.

type FilesFeed

type FilesFeed struct {
	// contains filtered or unexported fields
}

FilesFeed is a Feed interface implementation that that emits file contents from a local subdirectory tree.

The subdirectory tree lives under the dataDir...

<dataDir>/<sourceName/**

FilesFeed supports optional regexp patterns to allow you to filter for only the file paths that you want.

Limitations:

- Only a small number of files will work well (hundreds to low thousands, not millions).

- FilesFeed polls for file modification timestamp changes as a poor-man's approach instead of properly tracking sequence numbers. That has implications such as whenever a FilesFeed (re-)starts (e.g., the process restarts), the FilesFeed will re-emits all files and then track the max modification timestamp going forwards as it regularly polls for file changes.

func NewFilesFeed

func NewFilesFeed(mgr *Manager, name, indexName, sourceName,
	paramsStr string, dests map[string]Dest, disable bool) (
	*FilesFeed, error)

NewFilesFeed creates a ready-to-be-started FilesFeed.

func (*FilesFeed) Close

func (t *FilesFeed) Close() error

func (*FilesFeed) Dests

func (t *FilesFeed) Dests() map[string]Dest

func (*FilesFeed) IndexName

func (t *FilesFeed) IndexName() string

func (*FilesFeed) Name

func (t *FilesFeed) Name() string

func (*FilesFeed) Start

func (t *FilesFeed) Start() error

func (*FilesFeed) Stats

func (t *FilesFeed) Stats(w io.Writer) error

type FilesFeedParams

type FilesFeedParams struct {
	RegExps       []string `json:"regExps"`
	MaxFileSize   int64    `json:"maxFileSize"`
	NumPartitions int      `json:"numPartitions"`
	SleepStartMS  int      `json:"sleepStartMS"`
	BackoffFactor float32  `json:"backoffFactor"`
	MaxSleepMS    int      `json:"maxSleepMS"`
}

FilesFeedParams represents the JSON expected as the sourceParams for a FilesFeed.

type IndexDef

type IndexDef struct {
	Type         string     `json:"type"` // Ex: "blackhole", etc.
	Name         string     `json:"name"`
	UUID         string     `json:"uuid"` // Like a revision id.
	Params       string     `json:"params"`
	SourceType   string     `json:"sourceType"`
	SourceName   string     `json:"sourceName,omitempty"`
	SourceUUID   string     `json:"sourceUUID,omitempty"`
	SourceParams string     `json:"sourceParams,omitempty"` // Optional connection info.
	PlanParams   PlanParams `json:"planParams,omitempty"`
}

An IndexDef is a logical index definition.

func (*IndexDef) MarshalJSON

func (def *IndexDef) MarshalJSON() ([]byte, error)

Implemention of json.Marshaler interface. The IndexDef JSON output format is now the natural, nested JSON format (as opposed to the previous, enveloped format).

func (*IndexDef) UnmarshalJSON

func (def *IndexDef) UnmarshalJSON(b []byte) error

Implemention of json.Unmarshaler interface, which accepts either the new, natural, nested JSON format or the older, enveloped format.

type IndexDefEnveloped

type IndexDefEnveloped struct {
	Params       string `json:"params"`
	SourceParams string `json:"sourceParams"`
	// contains filtered or unexported fields
}

An IndexDefEnveloped overrides IndexDef with Params and SourceParams fields that are enveloped JSON (JSON encoded as strings), for backwards compatibility.

type IndexDefNested

type IndexDefNested struct {
	Params       map[string]interface{} `json:"params"`
	SourceParams map[string]interface{} `json:"sourceParams"`
	// contains filtered or unexported fields
}

An IndexDefNested overrides IndexDef with Params and SourceParams fields that are JSON nested objects instead of strings, for easier-to-use API.

type IndexDefs

type IndexDefs struct {
	// IndexDefs.UUID changes whenever any child IndexDef changes.
	UUID        string               `json:"uuid"`        // Like a revision id.
	IndexDefs   map[string]*IndexDef `json:"indexDefs"`   // Key is IndexDef.Name.
	ImplVersion string               `json:"implVersion"` // See VERSION.
}

An IndexDefs is zero or more index definitions.

func CfgGetIndexDefs

func CfgGetIndexDefs(cfg Cfg) (*IndexDefs, uint64, error)

Returns index definitions from a Cfg provider.

func NewIndexDefs

func NewIndexDefs(version string) *IndexDefs

Returns an intiialized IndexDefs.

func PlannerGetIndexDefs

func PlannerGetIndexDefs(cfg Cfg, version string) (*IndexDefs, error)

PlannerGetIndexDefs retrives index definitions from a Cfg.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

A Manager represents a runtime node in a cluster.

Although often used like a singleton, multiple Manager instances can be instantiated in a process to simulate a cluster of nodes.

A Manager has two related child, actor-like goroutines: - planner - janitor

A planner splits index definitions into index partitions (pindexes) and assigns those pindexes to nodes. A planner wakes up and runs whenever the index definitions change or the set of nodes changes (which are both read from the Cfg system). A planner stores the latest plans into the Cfg system.

A janitor running on each node maintains runtime PIndex and Feed instances, creating, deleting & hooking them up as necessary to try to match to latest plans from the planner. A janitor wakes up and runs whenever it sees that latest plans in the Cfg have changed.

func NewManager

func NewManager(version string, cfg Cfg, uuid string, tags []string,
	container string, weight int, extras, bindHttp, dataDir, server string,
	meh ManagerEventHandlers) *Manager

NewManager returns a new, ready-to-be-started Manager instance.

func NewManagerEx

func NewManagerEx(version string, cfg Cfg, uuid string, tags []string,
	container string, weight int, extras, bindHttp, dataDir, server string,
	meh ManagerEventHandlers, options map[string]string) *Manager

NewManagerEx returns a new, ready-to-be-started Manager instance, with additional options.

func (*Manager) AddEvent

func (mgr *Manager) AddEvent(jsonBytes []byte)

func (*Manager) BindHttp

func (mgr *Manager) BindHttp() string

Returns the configured bindHttp of a Manager.

func (*Manager) BumpIndexDefs

func (mgr *Manager) BumpIndexDefs(indexDefsUUID string) error

BumpIndexDefs bumps the uuid of the index defs, to force planners and other downstream tasks to re-run.

func (*Manager) Cfg

func (mgr *Manager) Cfg() Cfg

Returns the configured Cfg of a Manager.

func (*Manager) ClosePIndex

func (mgr *Manager) ClosePIndex(pindex *PIndex) error

ClosePIndex synchronously has the janitor close a pindex.

func (*Manager) Container

func (mgr *Manager) Container() string

Returns the configured container of a Manager.

func (*Manager) CoveringPIndexes

func (mgr *Manager) CoveringPIndexes(indexName, indexUUID string,
	planPIndexFilter PlanPIndexFilter, wantKind string) (
	localPIndexes []*PIndex,
	remotePlanPIndexes []*RemotePlanPIndex,
	err error)

CoveringPIndexes returns a non-overlapping, disjoint set (or cut) of PIndexes (either local or remote) that cover all the partitons of an index so that the caller can perform scatter/gather queries, etc. Only PlanPIndexes on wanted nodes that pass the planPIndexFilter filter will be returned.

TODO: Perhaps need a tighter check around indexUUID, as the current implementation might have a race where old pindexes with a matching (but outdated) indexUUID might be chosen.

TODO: This implementation currently always favors the local node's pindex, but should it? Perhaps a remote node is more up-to-date than the local pindex?

TODO: We should favor the most up-to-date node rather than the first one that we run into here? But, perhaps the most up-to-date node is also the most overloaded? Or, perhaps the planner may be trying to rebalance away the most up-to-date node and hitting it with load just makes the rebalance take longer?

func (*Manager) CoveringPIndexesBestEffort

func (mgr *Manager) CoveringPIndexesBestEffort(indexName, indexUUID string,
	planPIndexFilter PlanPIndexFilter, wantKind string) (
	localPIndexes []*PIndex,
	remotePlanPIndexes []*RemotePlanPIndex,
	missingPIndexNames []string,
	err error)

CoveringPIndexesBestEffort is similar to CoveringPIndexes, but does not error if there are missing/disabled nodes for some of the pindexes.

func (*Manager) CoveringPIndexesEx

func (mgr *Manager) CoveringPIndexesEx(spec CoveringPIndexesSpec,
	planPIndexFilter PlanPIndexFilter, noCache bool) (
	[]*PIndex, []*RemotePlanPIndex, []string, error)

CoveringPIndexesEx returns a non-overlapping, disjoint set (or cut) of PIndexes (either local or remote) that cover all the partitons of an index so that the caller can perform scatter/gather queries.

If the planPIndexFilter param is nil, then the spec.PlanPIndexFilterName is used.

func (*Manager) CreateIndex

func (mgr *Manager) CreateIndex(sourceType,
	sourceName, sourceUUID, sourceParams,
	indexType, indexName, indexParams string, planParams PlanParams,
	prevIndexUUID string) error

Creates a logical index definition. A non-"" prevIndexUUID means an update to an existing index.

func (*Manager) CurrentMaps

func (mgr *Manager) CurrentMaps() (map[string]Feed, map[string]*PIndex)

Returns a snapshot copy of the current feeds and pindexes.

func (*Manager) DataDir

func (mgr *Manager) DataDir() string

Returns the configured data dir of a Manager.

func (*Manager) DeleteAllIndexFromSource

func (mgr *Manager) DeleteAllIndexFromSource(
	sourceType, sourceName, sourceUUID string) error

DeleteAllIndexFromSource deletes all indexes with a given sourceType and sourceName.

func (*Manager) DeleteIndex

func (mgr *Manager) DeleteIndex(indexName string) error

DeleteIndex deletes a logical index definition.

func (*Manager) DeleteIndexEx

func (mgr *Manager) DeleteIndexEx(indexName, indexUUID string) error

DeleteIndexEx deletes a logical index definition, with an optional indexUUID ("" means don't care).

func (*Manager) Events

func (mgr *Manager) Events() *list.List

Events must be invoked holding the manager lock.

func (*Manager) Extras

func (mgr *Manager) Extras() string

Returns the configured extras of a Manager.

func (*Manager) GetIndexDef

func (mgr *Manager) GetIndexDef(indexName string, refresh bool) (
	*IndexDef, *PIndexImplType, error)

GetIndexDef retrieves the IndexDef and PIndexImplType for an index. Use refresh of true to force a read from Cfg.

func (*Manager) GetIndexDefs

func (mgr *Manager) GetIndexDefs(refresh bool) (
	*IndexDefs, map[string]*IndexDef, error)

Returns read-only snapshot of the IndexDefs, also with IndexDef's organized by name. Use refresh of true to force a read from Cfg.

func (*Manager) GetNodeDefs

func (mgr *Manager) GetNodeDefs(kind string, refresh bool) (
	nodeDefs *NodeDefs, err error)

Returns read-only snapshot of NodeDefs of a given kind (i.e., NODE_DEFS_WANTED). Use refresh of true to force a read from Cfg.

func (*Manager) GetOptions

func (mgr *Manager) GetOptions() map[string]string

GetOptions returns the (read-only) options of a Manager. Callers must not modify the returned map.

func (*Manager) GetPIndex

func (mgr *Manager) GetPIndex(pindexName string) *PIndex

GetPIndex retrieves a named pindex instance.

func (*Manager) GetPlanPIndexes

func (mgr *Manager) GetPlanPIndexes(refresh bool) (
	*PlanPIndexes, map[string][]*PlanPIndex, error)

Returns read-only snapshot of the PlanPIndexes, also with PlanPIndex's organized by IndexName. Use refresh of true to force a read from Cfg.

func (*Manager) IndexControl

func (mgr *Manager) IndexControl(indexName, indexUUID, readOp, writeOp,
	planFreezeOp string) error

IndexControl is used to change runtime properties of an index definition.

func (*Manager) JanitorKick

func (mgr *Manager) JanitorKick(msg string)

JanitorKick synchronously kicks the manager's janitor, if any.

func (*Manager) JanitorLoop

func (mgr *Manager) JanitorLoop()

JanitorLoop is the main loop for the janitor.

func (*Manager) JanitorNOOP

func (mgr *Manager) JanitorNOOP(msg string)

JanitorNOOP sends a synchronous NOOP to the manager's janitor, if any.

func (*Manager) JanitorOnce

func (mgr *Manager) JanitorOnce(reason string) error

JanitorOnce is the main body of a JanitorLoop.

func (*Manager) Kick

func (mgr *Manager) Kick(msg string)

Schedule kicks of the planner and janitor of a Manager.

func (*Manager) LoadDataDir

func (mgr *Manager) LoadDataDir() error

Walk the data dir and register pindexes for a Manager instance.

func (*Manager) Lock

func (mgr *Manager) Lock()

func (*Manager) Options

func (mgr *Manager) Options() map[string]string

Same as GetOptions(), for backwards compatibility.

func (*Manager) PIndexPath

func (mgr *Manager) PIndexPath(pindexName string) string

PIndexPath returns the filesystem path for a given named pindex. See also ParsePIndexPath().

func (*Manager) ParsePIndexPath

func (mgr *Manager) ParsePIndexPath(pindexPath string) (string, bool)

ParsePIndexPath returns the name for a pindex given a filesystem path. See also PIndexPath().

func (*Manager) PlannerKick

func (mgr *Manager) PlannerKick(msg string)

PlannerKick synchronously kicks the manager's planner, if any.

func (*Manager) PlannerLoop

func (mgr *Manager) PlannerLoop()

PlannerLoop is the main loop for the planner.

func (*Manager) PlannerNOOP

func (mgr *Manager) PlannerNOOP(msg string)

PlannerNOOP sends a synchronous NOOP request to the manager's planner, if any.

func (*Manager) PlannerOnce

func (mgr *Manager) PlannerOnce(reason string) (bool, error)

PlannerOnce is the main body of a PlannerLoop.

func (*Manager) Register

func (mgr *Manager) Register(register string) error

Register will register or unregister a Manager with its configured Cfg system, based on the register parameter, which can have these values: * wanted - register this node as wanted * wantedForce - same as wanted, but force a Cfg update * known - register this node as known * knownForce - same as unknown, but force a Cfg update * unwanted - unregister this node no longer wanted * unknown - unregister this node no longer wanted and no longer known * unchanged - don't change any Cfg registrations for this node

func (*Manager) RemoveNodeDef

func (mgr *Manager) RemoveNodeDef(kind string) error

RemoveNodeDef removes the NodeDef registrations in the Cfg system for this Manager node instance.

func (*Manager) RemovePIndex

func (mgr *Manager) RemovePIndex(pindex *PIndex) error

RemovePIndex synchronously has the janitor remove a pindex.

func (*Manager) SaveNodeDef

func (mgr *Manager) SaveNodeDef(kind string, force bool) error

SaveNodeDef updates the NodeDef registrations in the Cfg system for this Manager node instance.

func (*Manager) Server

func (mgr *Manager) Server() string

Returns the configured server of a Manager.

func (*Manager) SetOptions

func (mgr *Manager) SetOptions(options map[string]string)

SetOptions replaces the options map with the provided map, which should be considered immutable after this call.

func (*Manager) Start

func (mgr *Manager) Start(register string) error

Start will start and register a Manager instance with its configured Cfg system, based on the register parameter. See Manager.Register().

func (*Manager) StartCfg

func (mgr *Manager) StartCfg() error

StartCfg will start Cfg subscriptions.

func (*Manager) StartRegister

func (mgr *Manager) StartRegister(register string) error

StartRegister is deprecated and has been renamed to Register().

func (*Manager) StartTime

func (mgr *Manager) StartTime() time.Time

Returns the start time of a Manager.

func (*Manager) StatsCopyTo

func (mgr *Manager) StatsCopyTo(dst *ManagerStats)

Copies the current manager stats to the dst manager stats.

func (*Manager) Stop

func (mgr *Manager) Stop()

func (*Manager) Tags

func (mgr *Manager) Tags() []string

Returns the configured tags of a Manager, which should be treated as immutable / read-only.

func (*Manager) TagsMap

func (mgr *Manager) TagsMap() map[string]bool

Returns the configured tags map of a Manager, which should be treated as immutable / read-only.

func (*Manager) UUID

func (mgr *Manager) UUID() string

Returns the UUID (the "node UUID") of a Manager.

func (*Manager) Unlock

func (mgr *Manager) Unlock()

func (*Manager) Version

func (mgr *Manager) Version() string

Returns the version of a Manager.

func (*Manager) Weight

func (mgr *Manager) Weight() int

Returns the configured weight of a Manager.

type ManagerEventHandlers

type ManagerEventHandlers interface {
	OnRegisterPIndex(pindex *PIndex)
	OnUnregisterPIndex(pindex *PIndex)
	OnFeedError(srcType string, r Feed, err error)
}

ManagerEventHandlers represents the callback interface where an application can receive important event callbacks from a Manager.

type ManagerStats

type ManagerStats struct {
	TotKick uint64

	TotSetOptions uint64

	TotRegisterFeed     uint64
	TotUnregisterFeed   uint64
	TotRegisterPIndex   uint64
	TotUnregisterPIndex uint64

	TotSaveNodeDef       uint64
	TotSaveNodeDefNil    uint64
	TotSaveNodeDefGetErr uint64
	TotSaveNodeDefSetErr uint64
	TotSaveNodeDefRetry  uint64
	TotSaveNodeDefSame   uint64
	TotSaveNodeDefOk     uint64

	TotCreateIndex    uint64
	TotCreateIndexOk  uint64
	TotDeleteIndex    uint64
	TotDeleteIndexOk  uint64
	TotIndexControl   uint64
	TotIndexControlOk uint64

	TotDeleteIndexBySource    uint64
	TotDeleteIndexBySourceErr uint64
	TotDeleteIndexBySourceOk  uint64

	TotPlannerOpStart           uint64
	TotPlannerOpRes             uint64
	TotPlannerOpErr             uint64
	TotPlannerOpDone            uint64
	TotPlannerNOOP              uint64
	TotPlannerNOOPOk            uint64
	TotPlannerKick              uint64
	TotPlannerKickStart         uint64
	TotPlannerKickChanged       uint64
	TotPlannerKickErr           uint64
	TotPlannerKickOk            uint64
	TotPlannerUnknownErr        uint64
	TotPlannerSubscriptionEvent uint64
	TotPlannerStop              uint64

	TotJanitorOpStart           uint64
	TotJanitorOpRes             uint64
	TotJanitorOpErr             uint64
	TotJanitorOpDone            uint64
	TotJanitorNOOP              uint64
	TotJanitorNOOPOk            uint64
	TotJanitorKick              uint64
	TotJanitorKickStart         uint64
	TotJanitorKickErr           uint64
	TotJanitorKickOk            uint64
	TotJanitorClosePIndex       uint64
	TotJanitorRemovePIndex      uint64
	TotJanitorLoadDataDir       uint64
	TotJanitorUnknownErr        uint64
	TotJanitorSubscriptionEvent uint64
	TotJanitorStop              uint64

	TotRefreshLastNodeDefs     uint64
	TotRefreshLastIndexDefs    uint64
	TotRefreshLastPlanPIndexes uint64
}

ManagerStats represents the stats/metrics tracked by a Manager instance.

func (*ManagerStats) AtomicCopyTo

func (s *ManagerStats) AtomicCopyTo(r *ManagerStats)

AtomicCopyTo copies metrics from s to r (from source to result).

type MsgRing

type MsgRing struct {
	Next int      `json:"next"`
	Msgs [][]byte `json:"msgs"`

	SmallBufs [][]byte // Pool of small buffers.
	LargeBufs [][]byte // Pool of large buffers.
	// contains filtered or unexported fields
}

A MsgRing wraps an io.Writer, and remembers a ring of previous writes to the io.Writer. It is concurrent safe and is useful, for example, for remembering recent log messages.

func NewMsgRing

func NewMsgRing(inner io.Writer, ringSize int) (*MsgRing, error)

NewMsgRing returns a MsgRing of a given ringSize.

func (*MsgRing) Messages

func (m *MsgRing) Messages() [][]byte

Retrieves the recent writes to the MsgRing.

func (*MsgRing) Write

func (m *MsgRing) Write(p []byte) (n int, err error)

Implements the io.Writer interface.

type NILFeed

type NILFeed struct {
	// contains filtered or unexported fields
}

A NILFeed implements the Feed interface and never feeds any data to its Dest instances. It's useful for testing and for pindexes that are actually primary data sources.

See also the "blackhole" pindex type for the "opposite equivalent" of a NILFeed.

func NewNILFeed

func NewNILFeed(name, indexName string, dests map[string]Dest) *NILFeed

NewNILFeed creates a ready-to-be-started NILFeed instance.

func (*NILFeed) Close

func (t *NILFeed) Close() error

func (*NILFeed) Dests

func (t *NILFeed) Dests() map[string]Dest

func (*NILFeed) IndexName

func (t *NILFeed) IndexName() string

func (*NILFeed) Name

func (t *NILFeed) Name() string

func (*NILFeed) Start

func (t *NILFeed) Start() error

func (*NILFeed) Stats

func (t *NILFeed) Stats(w io.Writer) error

type NodeDef

type NodeDef struct {
	HostPort    string   `json:"hostPort"`
	UUID        string   `json:"uuid"`
	ImplVersion string   `json:"implVersion"` // See VERSION.
	Tags        []string `json:"tags"`
	Container   string   `json:"container"`
	Weight      int      `json:"weight"`
	Extras      string   `json:"extras"`
}

A NodeDef is a node definition.

type NodeDefs

type NodeDefs struct {
	// NodeDefs.UUID changes whenever any child NodeDef changes.
	UUID        string              `json:"uuid"`        // Like a revision id.
	NodeDefs    map[string]*NodeDef `json:"nodeDefs"`    // Key is NodeDef.UUID.
	ImplVersion string              `json:"implVersion"` // See VERSION.
}

A NodeDefs is comprised of zero or more node definitions.

func CfgGetNodeDefs

func CfgGetNodeDefs(cfg Cfg, kind string) (*NodeDefs, uint64, error)

Retrieves node definitions from a Cfg provider.

func NewNodeDefs

func NewNodeDefs(version string) *NodeDefs

Returns an initialized NodeDefs.

func PlannerGetNodeDefs

func PlannerGetNodeDefs(cfg Cfg, version, uuid string) (
	*NodeDefs, error)

PlannerGetNodeDefs retrieves node definitions from a Cfg.

type NodePlanParam

type NodePlanParam struct {
	CanRead  bool `json:"canRead"`
	CanWrite bool `json:"canWrite"`
}

A NodePlanParam defines whether a particular node can service a particular index definition.

func GetNodePlanParam

func GetNodePlanParam(nodePlanParams map[string]map[string]*NodePlanParam,
	nodeUUID, indexDefName, planPIndexName string) *NodePlanParam

GetNodePlanParam returns a relevant NodePlanParam for a given node from a nodePlanParams, defaulting to a less-specific NodePlanParam if needed.

type PIndex

type PIndex struct {
	Name             string     `json:"name"`
	UUID             string     `json:"uuid"`
	IndexType        string     `json:"indexType"`
	IndexName        string     `json:"indexName"`
	IndexUUID        string     `json:"indexUUID"`
	IndexParams      string     `json:"indexParams"`
	SourceType       string     `json:"sourceType"`
	SourceName       string     `json:"sourceName"`
	SourceUUID       string     `json:"sourceUUID"`
	SourceParams     string     `json:"sourceParams"`
	SourcePartitions string     `json:"sourcePartitions"`
	Path             string     `json:"-"` // Transient, not persisted.
	Impl             PIndexImpl `json:"-"` // Transient, not persisted.
	Dest             Dest       `json:"-"` // Transient, not persisted.
	// contains filtered or unexported fields
}

A PIndex represents a partition of an index, or an "index partition". A logical index definition will be split into one or more pindexes.

func NewPIndex

func NewPIndex(mgr *Manager, name, uuid,
	indexType, indexName, indexUUID, indexParams,
	sourceType, sourceName, sourceUUID, sourceParams, sourcePartitions string,
	path string) (*PIndex, error)

Creates a pindex, including its backend implementation structures, and its files.

func OpenPIndex

func OpenPIndex(mgr *Manager, path string) (*PIndex, error)

OpenPIndex reopens a previously created pindex. The path argument must be a directory for the pindex.

func (*PIndex) Close

func (p *PIndex) Close(remove bool) error

Close down a pindex, optionally removing its stored files.

type PIndexImpl

type PIndexImpl interface{}

PIndexImpl represents a runtime pindex implementation instance, whose runtime type depends on the pindex's type.

type PIndexImplType

type PIndexImplType struct {
	// Invoked by the manager when it wants validate indef definition
	// inputs before doing the actual creation.
	Validate func(indexType, indexName, indexParams string) error

	// Invoked by the manager when it wants to create an index
	// partition.  The pindex implementation should persist enough
	// info into the path subdirectory so that it can reconstitute the
	// pindex during restart and Open().
	New func(indexType, indexParams, path string, restart func()) (
		PIndexImpl, Dest, error)

	// Invoked by the manager when it wants a pindex implementation to
	// reconstitute and reload a pindex instance back into the
	// process, such as when the process has re-started.
	Open func(indexType, path string, restart func()) (
		PIndexImpl, Dest, error)

	// Invoked by the manager when it wants a count of documents from
	// an index.  The registered Count() function can be nil.
	Count func(mgr *Manager, indexName, indexUUID string) (
		uint64, error)

	// Invoked by the manager when it wants to query an index.  The
	// registered Query() function can be nil.
	Query func(mgr *Manager, indexName, indexUUID string,
		req []byte, res io.Writer) error

	// Description is used to populate docs, UI, etc, such as index
	// type drop-down control in the web admin UI.  Format of the
	// description string:
	//
	//    $categoryName/$indexType - short descriptive string
	//
	// The $categoryName is something like "advanced", or "general".
	Description string

	// A prototype instance of indexParams JSON that is usable for
	// Validate() and New().
	StartSample interface{}

	// Example instances of JSON that are usable for Query requests().
	// These are used to help generate API documentation.
	QuerySamples func() []Documentation

	// Displayed in docs, web admin UI, etc, and often might be a link
	// to even further help.
	QueryHelp string

	// Invoked during startup to allow pindex implementation to affect
	// the REST API with its own endpoint.
	InitRouter func(r *mux.Router, phase string, mgr *Manager)

	// Optional, additional handlers a pindex implementation may have
	// for /api/diag output.
	DiagHandlers []DiagHandler

	// Optional, allows pindex implementaiton to add more information
	// to the REST /api/managerMeta output.
	MetaExtra func(map[string]interface{})

	// Optional, allows pindex implementation to specify advanced UI
	// implementations and information.
	UI map[string]string
}

PIndexImplType defines the functions that every pindex implementation type must register on startup.

func PIndexImplTypeForIndex

func PIndexImplTypeForIndex(cfg Cfg, indexName string) (
	*PIndexImplType, error)

PIndexImplTypeForIndex retrieves from the Cfg provider the index type for a given index.

type PIndexStoreStats

type PIndexStoreStats struct {
	TimerBatchStore metrics.Timer
	Errors          *list.List // Capped list of string (json).
}

PIndexStoreStats provides some common stats/metrics and error tracking that some pindex type backends can reuse.

func (*PIndexStoreStats) WriteJSON

func (d *PIndexStoreStats) WriteJSON(w io.Writer)

type PlanPIndex

type PlanPIndex struct {
	Name             string `json:"name,omitempty"` // Stable & unique cluster wide.
	UUID             string `json:"uuid"`
	IndexType        string `json:"indexType"`             // See IndexDef.Type.
	IndexName        string `json:"indexName"`             // See IndexDef.Name.
	IndexUUID        string `json:"indexUUID"`             // See IndefDef.UUID.
	IndexParams      string `json:"indexParams,omitempty"` // See IndexDef.Params.
	SourceType       string `json:"sourceType"`
	SourceName       string `json:"sourceName,omitempty"`
	SourceUUID       string `json:"sourceUUID,omitempty"`
	SourceParams     string `json:"sourceParams,omitempty"` // Optional connection info.
	SourcePartitions string `json:"sourcePartitions"`

	Nodes map[string]*PlanPIndexNode `json:"nodes"` // Keyed by NodeDef.UUID.
}

A PlanPIndex represents the plan for a particular index partition, including on what nodes that the index partition is assigned to. An index partition might be assigned to more than one node if the "plan params" has a replica count > 0.

func (*PlanPIndex) MarshalJSON

func (ppi *PlanPIndex) MarshalJSON() ([]byte, error)

Implemention of json.Marshaler interface. The PlanPIndex JSON output format is now the natural, nested JSON format (as opposed to the previous, enveloped format).

func (*PlanPIndex) UnmarshalJSON

func (ppi *PlanPIndex) UnmarshalJSON(b []byte) error

Implemention of json.Unmarshaler interface, which accepts either the new, nested JSON format or the older, enveloped format.

type PlanPIndexEnveloped

type PlanPIndexEnveloped struct {
	IndexParams  string `json:"indexParams,omitempty"`
	SourceParams string `json:"sourceParams,omitempty"`
	// contains filtered or unexported fields
}

A PlanPIndexEnveloped overrides PlanPIndex with IndexParams and SourceParams fields that are enveloped JSON (JSON encoded as strings), for backwards compatibility.

type PlanPIndexFilter

type PlanPIndexFilter func(*PlanPIndexNode) bool

PlanPIndexFilter is used to filter out nodes being considered by CoveringPIndexes().

type PlanPIndexIndexDef

type PlanPIndexIndexDef struct {
	IndexParams string `json:"indexParams"`
}

PlanPIndexIndexDef represents the shared, repeated index definition part of a PlanPIndex.

type PlanPIndexNested

type PlanPIndexNested struct {
	IndexParams  map[string]interface{} `json:"indexParams,omitempty"`
	SourceParams map[string]interface{} `json:"sourceParams,omitempty"`
	// contains filtered or unexported fields
}

A PlanPIndexNested overrides PlanPIndex with IndexParams and SourceParams fields that are JSON nested objects instead of strings, for easier-to-use API.

type PlanPIndexNode

type PlanPIndexNode struct {
	CanRead  bool `json:"canRead"`
	CanWrite bool `json:"canWrite"`
	Priority int  `json:"priority"` // Lower is higher priority, 0 is highest.
}

A PlanPIndexNode represents the kind of service a node has been assigned to provide for an index partition.

type PlanPIndexNodeRef

type PlanPIndexNodeRef struct {
	UUID string
	Node *PlanPIndexNode
}

PlanPIndexNodeRef represents an assignment of a pindex to a node.

type PlanPIndexNodeRefs

type PlanPIndexNodeRefs []*PlanPIndexNodeRef

PlanPIndexNodeRefs represents assignments of pindexes to nodes.

func (PlanPIndexNodeRefs) Len

func (pms PlanPIndexNodeRefs) Len() int

func (PlanPIndexNodeRefs) Less

func (pms PlanPIndexNodeRefs) Less(i, j int) bool

func (PlanPIndexNodeRefs) Swap

func (pms PlanPIndexNodeRefs) Swap(i, j int)

type PlanPIndexSourceDef

type PlanPIndexSourceDef struct {
	SourceParams string `json:"sourceParams"`
}

PlanPIndexSourceDef represents the shared, repeated source definition part of a PlanPIndex.

type PlanPIndexes

type PlanPIndexes struct {
	// PlanPIndexes.UUID changes whenever any child PlanPIndex changes.
	UUID         string                 `json:"uuid"`         // Like a revision id.
	PlanPIndexes map[string]*PlanPIndex `json:"planPIndexes"` // Key is PlanPIndex.Name.
	ImplVersion  string                 `json:"implVersion"`  // See VERSION.
	Warnings     map[string][]string    `json:"warnings"`     // Key is IndexDef.Name.
}

A PlanPIndexes is comprised of zero or more planPIndexes.

func CalcPlan

func CalcPlan(mode string, indexDefs *IndexDefs, nodeDefs *NodeDefs,
	planPIndexesPrev *PlanPIndexes, version, server string,
	options map[string]string, plannerFilter PlannerFilter) (
	*PlanPIndexes, error)

Split logical indexes into PIndexes and assign PIndexes to nodes. As part of this, planner hook callbacks will be invoked to allow advanced applications to adjust the planning outcome.

func CfgGetPlanPIndexes

func CfgGetPlanPIndexes(cfg Cfg) (*PlanPIndexes, uint64, error)

Retrieves PlanPIndexes from a Cfg provider.

func CopyPlanPIndexes

func CopyPlanPIndexes(planPIndexes *PlanPIndexes,
	version string) *PlanPIndexes

CopyPlanPIndexes returns a copy of the given planPIndexes, albeit with a new UUID and given version.

func NewPlanPIndexes

func NewPlanPIndexes(version string) *PlanPIndexes

Returns an initialized PlanPIndexes.

func PlannerGetPlanPIndexes

func PlannerGetPlanPIndexes(cfg Cfg, version string) (
	*PlanPIndexes, uint64, error)

PlannerGetPlanPIndexes retrieves the planned pindexes from a Cfg.

type PlanPIndexesShared

type PlanPIndexesShared struct {
	PlanPIndexes

	// Key is "indexType/indexName/indexUUID".
	SharedIndexDefs map[string]*PlanPIndexIndexDef `json:"sharedIndexDefs"`

	// Key is "sourceType/sourceName/sourceUUID".
	SharedSourceDefs map[string]*PlanPIndexSourceDef `json:"sharedSourceDefs"`
}

PlanPIndexesShared represents a PlanPIndexes that has been deduplicated into shared parts.

type PlanParams

type PlanParams struct {
	// MaxPartitionsPerPIndex controls the maximum number of source
	// partitions the planner can assign to or clump into a PIndex (or
	// index partition).
	MaxPartitionsPerPIndex int `json:"maxPartitionsPerPIndex,omitempty"`

	// NumReplicas controls the number of replicas for a PIndex, over
	// the first copy.  The first copy is not counted as a replica.
	// For example, a NumReplicas setting of 2 means there should be a
	// primary and 2 replicas... so 3 copies in total.  A NumReplicas
	// of 0 means just the first, primary copy only.
	NumReplicas int `json:"numReplicas,omitempty"`

	// HierarchyRules defines the policy the planner should follow
	// when assigning PIndexes to nodes, especially for replica
	// placement.  Through the HierarchyRules, a user can specify, for
	// example, that the first replica should be not on the same rack
	// and zone as the first copy.  Some examples:
	// Try to put the first replica on the same rack...
	// {"replica":[{"includeLevel":1,"excludeLevel":0}]}
	// Try to put the first replica on a different rack...
	// {"replica":[{"includeLevel":2,"excludeLevel":1}]}
	HierarchyRules blance.HierarchyRules `json:"hierarchyRules,omitempty"`

	// NodePlanParams allows users to specify per-node input to the
	// planner, such as whether PIndexes assigned to different nodes
	// can be readable or writable.  Keyed by node UUID.  Value is
	// keyed by planPIndex.Name or indexDef.Name.  The empty string
	// ("") is used to represent any node UUID and/or any planPIndex
	// and/or any indexDef.
	NodePlanParams map[string]map[string]*NodePlanParam `json:"nodePlanParams,omitempty"`

	// PIndexWeights allows users to specify an optional weight for a
	// PIndex, where weights default to 1.  In a range-partitioned
	// index, for example, some index partitions (or PIndexes) may
	// have more entries (higher weight) than other index partitions.
	PIndexWeights map[string]int `json:"pindexWeights,omitempty"`

	// PlanFrozen means the planner should not change the previous
	// plan for an index, even if as nodes join or leave and even if
	// there was no previous plan.  Defaults to false (allow
	// re-planning).
	PlanFrozen bool `json:"planFrozen,omitempty"`
}

A PlanParams holds input parameters to the planner, that control how the planner should split an index definition into one or more index partitions, and how the planner should assign those index partitions to nodes.

func NewPlanParams

func NewPlanParams(mgr *Manager) PlanParams

type PlannerFilter

type PlannerFilter func(indexDef *IndexDef,
	planPIndexesPrev, planPIndexes *PlanPIndexes) bool

A PlannerFilter callback func should return true if the plans for an indexDef should be updated during CalcPlan(), and should return false if the plans for the indexDef should be remain untouched.

type PlannerHook

type PlannerHook func(in PlannerHookInfo) (out PlannerHookInfo, skip bool, err error)

A PlannerHook is an optional callback func supplied by the application via PlannerHooks and is invoked during planning.

type PlannerHookInfo

type PlannerHookInfo struct {
	PlannerHookPhase string

	Mode    string
	Version string
	Server  string

	Options map[string]string

	IndexDefs *IndexDefs
	IndexDef  *IndexDef

	NodeDefs          *NodeDefs
	NodeUUIDsAll      []string
	NodeUUIDsToAdd    []string
	NodeUUIDsToRemove []string
	NodeWeights       map[string]int
	NodeHierarchy     map[string]string

	PlannerFilter PlannerFilter

	PlanPIndexesPrev *PlanPIndexes
	PlanPIndexes     *PlanPIndexes

	PlanPIndexesForIndex map[string]*PlanPIndex
}

A PlannerHookInfo is the in/out information provided to PlannerHook callbacks. If the PlannerHook wishes to modify any of these fields to affect the planning outcome, it must copy the field value (e.g., copy-on-write).

func NoopPlannerHook

func NoopPlannerHook(x PlannerHookInfo) (PlannerHookInfo, bool, error)

A NoopPlannerHook is a no-op planner hook that just returns its input.

type PrimaryFeed

type PrimaryFeed struct {
	// contains filtered or unexported fields
}

A PrimaryFeed implements both the Feed and Dest interfaces, for chainability; and is also useful for testing.

One motivation for a PrimaryFeed implementation is from the realization that some pindex backends might not actually be secondary indexes, but are instead better considered as primary data sources in their own right. For example, you can imagine some kind of KeyValuePIndex backend. The system design, however, still requires hooking such "primary pindexes" up to a feed. Instead of using a NILFeed, you might instead use a PrimaryFeed, as unlike a NILFeed the PrimaryFeed provides a "NumPartitions" functionality.

func NewPrimaryFeed

func NewPrimaryFeed(name, indexName string, pf DestPartitionFunc,
	dests map[string]Dest) *PrimaryFeed

func (*PrimaryFeed) Close

func (t *PrimaryFeed) Close() error

func (*PrimaryFeed) ConsistencyWait

func (t *PrimaryFeed) ConsistencyWait(partition, partitionUUID string,
	consistencyLevel string,
	consistencySeq uint64,
	cancelCh <-chan bool) error

func (*PrimaryFeed) Count

func (t *PrimaryFeed) Count(pindex *PIndex, cancelCh <-chan bool) (
	uint64, error)

func (*PrimaryFeed) DataDelete

func (t *PrimaryFeed) DataDelete(partition string,
	key []byte, seq uint64,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*PrimaryFeed) DataUpdate

func (t *PrimaryFeed) DataUpdate(partition string,
	key []byte, seq uint64, val []byte,
	cas uint64,
	extrasType DestExtrasType, extras []byte) error

func (*PrimaryFeed) Dests

func (t *PrimaryFeed) Dests() map[string]Dest

func (*PrimaryFeed) IndexName

func (t *PrimaryFeed) IndexName() string

func (*PrimaryFeed) Name

func (t *PrimaryFeed) Name() string

func (*PrimaryFeed) OpaqueGet

func (t *PrimaryFeed) OpaqueGet(partition string) (
	value []byte, lastSeq uint64, err error)

func (*PrimaryFeed) OpaqueSet

func (t *PrimaryFeed) OpaqueSet(partition string,
	value []byte) error

func (*PrimaryFeed) Query

func (t *PrimaryFeed) Query(pindex *PIndex, req []byte, w io.Writer,
	cancelCh <-chan bool) error

func (*PrimaryFeed) Rollback

func (t *PrimaryFeed) Rollback(partition string,
	rollbackSeq uint64) error

func (*PrimaryFeed) SnapshotStart

func (t *PrimaryFeed) SnapshotStart(partition string,
	snapStart, snapEnd uint64) error

func (*PrimaryFeed) Start

func (t *PrimaryFeed) Start() error

func (*PrimaryFeed) Stats

func (t *PrimaryFeed) Stats(w io.Writer) error

type PrimarySourceParams

type PrimarySourceParams struct {
	NumPartitions int `json:"numPartitions"`
}

PrimarySourceParams represents the JSON for the sourceParams for a primary feed.

type QueryCtl

type QueryCtl struct {
	Timeout     int64              `json:"timeout"`
	Consistency *ConsistencyParams `json:"consistency"`
}

QueryCtl defines the JSON parameters that control query execution and which are independent of any specific pindex type.

type QueryCtlParams

type QueryCtlParams struct {
	Ctl QueryCtl `json:"ctl"`
}

QueryCtlParams defines the JSON that includes the "ctl" part of a query request. These "ctl" query request parameters are independent of any specific pindex type.

type RemotePlanPIndex

type RemotePlanPIndex struct {
	PlanPIndex *PlanPIndex
	NodeDef    *NodeDef
}

RemotePlanPIndex associations are returned by CoveringPIndexes().

type StopAfterSourceParams

type StopAfterSourceParams struct {
	// Valid values: "", "markReached".
	StopAfter string `json:"stopAfter"`

	// Keyed by source partition.
	MarkPartitionSeqs map[string]UUIDSeq `json:"markPartitionSeqs"`
}

StopAfterSourceParams defines optional fields for the sourceParams that can stop the data source feed (i.e., index ingest) if the seqs per partition have been reached. It can be used, for example, to help with "one-time indexing" behavior.

type TAPFeed

type TAPFeed struct {
	// contains filtered or unexported fields
}

A TAPFeed implements the Feed interface and handles the TAP protocol to receive data from a couchbase data source.

func NewTAPFeed

func NewTAPFeed(name, indexName, url, poolName, bucketName, bucketUUID,
	paramsStr string, pf DestPartitionFunc, dests map[string]Dest,
	disable bool) (*TAPFeed, error)

NewTAPFeed creates a new, ready-to-be-started TAPFeed.

func (*TAPFeed) Close

func (t *TAPFeed) Close() error

func (*TAPFeed) Dests

func (t *TAPFeed) Dests() map[string]Dest

func (*TAPFeed) IndexName

func (t *TAPFeed) IndexName() string

func (*TAPFeed) Name

func (t *TAPFeed) Name() string

func (*TAPFeed) Start

func (t *TAPFeed) Start() error

func (*TAPFeed) Stats

func (t *TAPFeed) Stats(w io.Writer) error

type TAPFeedParams

type TAPFeedParams struct {
	BackoffFactor float32 `json:"backoffFactor"`
	SleepInitMS   int     `json:"sleepInitMS"`
	SleepMaxMS    int     `json:"sleepMaxMS"`
}

TAPFeedParams represents the JSON of the sourceParams for a TAP feed.

type UUIDSeq

type UUIDSeq struct {
	UUID string
	Seq  uint64
}

A UUIDSeq associates a UUID (such as from a partition's UUID) with a seq number.

type VBucketMetaData

type VBucketMetaData struct {
	FailOverLog [][]uint64 `json:"failOverLog"`
}

Directories

Path Synopsis
cmd
metakv-mock
A fake or mock metakv server for dev/testing purposes.
A fake or mock metakv server for dev/testing purposes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL