projector

package
v0.0.0-...-d8c7374 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2017 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FeedConfigParams

func FeedConfigParams() []string

FeedConfigParams return the list of configuration params supported by a feed.

func NewFakeBuckets

func NewFakeBuckets(buckets []string) map[string]*FakeBucket

NewFakeBuckets returns a reference to new FakeBucket.

Types

type Average

type Average struct {
	// contains filtered or unexported fields
}

Average maintains the average and variance of a stream of numbers in a space-efficient manner.

func (*Average) Add

func (av *Average) Add(sample int64)

Add a sample to counting average.

func (*Average) Count

func (av *Average) Count() int64

Count return the number of samples counted so far.

func (*Average) Max

func (av *Average) Max() int64

Max return the maximum value of sample.

func (*Average) Mean

func (av *Average) Mean() int64

Mean return the sum of all samples by number of samples so far.

func (*Average) Min

func (av *Average) Min() int64

Min return the minimum value of sample.

func (*Average) Sd

func (av *Average) Sd() int64

GetStdDev return the standard-deviation of all samples so far.

func (*Average) Sum

func (av *Average) Sum() int64

GetTotal return the sum of all samples so far.

func (*Average) Variance

func (av *Average) Variance() int64

Variance return the variance of all samples so far.

type BucketAccess

type BucketAccess interface {
	// Refresh bucket meta information like vbmap
	Refresh() error

	// GetVBmap returns a map of `kvaddr` to list of vbuckets hosted in a kv
	// node.
	GetVBmap(kvaddrs []string) (map[string][]uint16, error)

	// FailoverLog fetch the failover log for specified vbucket
	GetFailoverLogs(
		opaque uint16,
		vbuckets []uint16,
		config map[string]interface{}) (couchbase.FailoverLog, error)

	// Close this bucket.
	Close()
}

BucketAccess interface manage a subset of vbucket streams with mutiple KV nodes. To be implemented by couchbase.Bucket type.

type BucketFeeder

type BucketFeeder interface {
	// GetChannel return a mutation channel.
	GetChannel() (mutch <-chan *mc.DcpEvent)

	// StartVbStreams starts a set of vbucket streams on this feed.
	// returns list of vbuckets for which StreamRequest is successfully
	// posted.
	StartVbStreams(opaque uint16, ts *protobuf.TsVbuuid) error

	// EndVbStreams ends an existing vbucket stream from this feed.
	EndVbStreams(opaque uint16, endTs *protobuf.TsVbuuid) error

	// CloseFeed ends all active streams on this feed and free its resources.
	CloseFeed() (err error)
}

BucketFeeder interface from a BucketAccess object.

func OpenBucketFeed

func OpenBucketFeed(
	feedname couchbase.DcpFeedName,
	b *couchbase.Bucket,
	opaque uint16,
	kvaddrs []string,
	config map[string]interface{}) (feeder BucketFeeder, err error)

OpenBucketFeed opens feed for bucket.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is immutable structure defined for each index, or any other entity that wants projection and routing over kv-mutations.

func NewEngine

func NewEngine(uuid uint64, evaluator c.Evaluator, router c.Router) *Engine

NewEngine creates a new engine instance for `uuid`.

func (*Engine) Endpoints

func (engine *Engine) Endpoints() []string

Endpoints hosting this engine.

func (*Engine) SnapshotData

func (engine *Engine) SnapshotData(
	m *mc.DcpEvent, vbno uint16, vbuuid, seqno uint64) interface{}

SnapshotData from this engine.

func (*Engine) StreamBeginData

func (engine *Engine) StreamBeginData(
	vbno uint16, vbuuid, seqno uint64) interface{}

StreamBeginData from this engine.

func (*Engine) StreamEndData

func (engine *Engine) StreamEndData(
	vbno uint16, vbuuid, seqno uint64) interface{}

StreamEndData from this engine.

func (*Engine) SyncData

func (engine *Engine) SyncData(
	vbno uint16, vbuuid, seqno uint64) interface{}

SyncData from this engine.

func (*Engine) TransformRoute

func (engine *Engine) TransformRoute(
	vbuuid uint64, m *mc.DcpEvent, data map[string]interface{},
	encodeBuf []byte) ([]byte, error)

TransformRoute data to endpoints.

type FakeBucket

type FakeBucket struct {
	C chan *mc.DcpEvent
	// contains filtered or unexported fields
}

FakeBucket fot unit testing.

func (*FakeBucket) Close

func (b *FakeBucket) Close(kvaddr string)

Close is method receiver for BucketAccess interface

func (*FakeBucket) CloseFeed

func (b *FakeBucket) CloseFeed() (err error)

CloseFeed is method receiver for BucketFeeder interface

func (*FakeBucket) EndVbStreams

func (b *FakeBucket) EndVbStreams(
	opaque uint16, ts *protobuf.TsVbuuid) (err error)

EndVbStreams is method receiver for BucketFeeder interface

func (*FakeBucket) GetChannel

func (b *FakeBucket) GetChannel() <-chan *mc.DcpEvent

GetChannel is method receiver for BucketFeeder interface

func (*FakeBucket) GetFailoverLogs

func (b *FakeBucket) GetFailoverLogs(
	opaque uint16,
	vbnos []uint16,
	conf map[string]interface{}) (couchbase.FailoverLog, error)

GetFailoverLogs is method receiver for BucketAccess interface

func (*FakeBucket) GetVBmap

func (b *FakeBucket) GetVBmap(kvaddrs []string) (map[string][]uint16, error)

GetVBmap is method receiver for BucketAccess interface

func (*FakeBucket) OpenKVFeed

func (b *FakeBucket) OpenKVFeed(kvaddr string) (BucketFeeder, error)

OpenKVFeed is method receiver for BucketAccess interface

func (*FakeBucket) SetFailoverLog

func (b *FakeBucket) SetFailoverLog(vbno uint16, flog [][2]uint64)

SetFailoverLog fake initialization method.

func (*FakeBucket) SetVbmap

func (b *FakeBucket) SetVbmap(kvaddr string, vbnos []uint16)

SetVbmap fake initialization method.

func (*FakeBucket) StartVbStreams

func (b *FakeBucket) StartVbStreams(
	opaque uint16, ts *protobuf.TsVbuuid) (err error)

StartVbStreams is method receiver for BucketFeeder interface

type FakeStream

type FakeStream struct {
	// contains filtered or unexported fields
}

FakeStream fot unit testing.

type Feed

type Feed struct {
	// contains filtered or unexported fields
}

Feed is mutation stream - for maintenance, initial-load, catchup etc...

func NewFeed

func NewFeed(
	pooln, topic string,
	projector *Projector,
	config c.Config, opaque uint16) (*Feed, error)

NewFeed creates a new topic feed. `config` contains following keys.

clusterAddr: KV cluster address <host:port>.
feedWaitStreamReqTimeout: wait for a response to StreamRequest
feedWaitStreamEndTimeout: wait for a response to StreamEnd
feedChanSize: channel size for feed's control path and back path
mutationChanSize: channel size of projector's data path routine
syncTimeout: timeout, in ms, for sending periodic Sync messages
kvstatTick: timeout, in ms, for logging kvstats
routerEndpointFactory: endpoint factory

func (*Feed) AddBuckets

func (feed *Feed) AddBuckets(
	req *protobuf.AddBucketsRequest,
	opaque uint16) (*protobuf.TopicResponse, error)

AddBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.

func (*Feed) AddInstances

func (feed *Feed) AddInstances(
	req *protobuf.AddInstancesRequest,
	opaque uint16) (*protobuf.TimestampResponse, error)

AddInstances will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) DelBuckets

func (feed *Feed) DelBuckets(
	req *protobuf.DelBucketsRequest, opaque uint16) error

DelBuckets will remove buckets and all its upstream and downstream elements, except endpoints. Synchronous call.

func (*Feed) DelInstances

func (feed *Feed) DelInstances(
	req *protobuf.DelInstancesRequest, opaque uint16) error

DelInstances will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) DeleteEndpoint

func (feed *Feed) DeleteEndpoint(raddr string) error

DeleteEndpoint will delete the specified endpoint address from feed.

func (*Feed) GetOpaque

func (feed *Feed) GetOpaque() uint16

GetOpaque return the opaque id that created this feed.

func (*Feed) GetStatistics

func (feed *Feed) GetStatistics() c.Statistics

GetStatistics for this feed. Synchronous call.

func (*Feed) GetTopicResponse

func (feed *Feed) GetTopicResponse() *protobuf.TopicResponse

GetTopicResponse for this feed. Synchronous call.

func (*Feed) MutationTopic

func (feed *Feed) MutationTopic(
	req *protobuf.MutationTopicRequest, opaque uint16) (*protobuf.TopicResponse, error)

MutationTopic will start the feed. Synchronous call.

func (*Feed) Ping

func (feed *Feed) Ping() error

Ping whether the feed is active or not.

func (*Feed) PostFinKVdata

func (feed *Feed) PostFinKVdata(bucket string)

PostFinKVdata feedback from data-path. Asynchronous call.

func (*Feed) PostStreamEnd

func (feed *Feed) PostStreamEnd(bucket string, m *mc.DcpEvent)

PostStreamEnd feedback from data-path. Asynchronous call.

func (*Feed) PostStreamRequest

func (feed *Feed) PostStreamRequest(bucket string, m *mc.DcpEvent)

PostStreamRequest feedback from data-path. Asynchronous call.

func (*Feed) RepairEndpoints

func (feed *Feed) RepairEndpoints(
	req *protobuf.RepairEndpointsRequest, opaque uint16) error

RepairEndpoints will restart specified endpoint-address if it is not active already. Synchronous call.

func (*Feed) ResetConfig

func (feed *Feed) ResetConfig(config c.Config) error

ResetConfig for this feed.

func (*Feed) RestartVbuckets

func (feed *Feed) RestartVbuckets(
	req *protobuf.RestartVbucketsRequest, opaque uint16) (*protobuf.TopicResponse, error)

RestartVbuckets will restart upstream vbuckets for specified buckets. Synchronous call.

func (*Feed) Shutdown

func (feed *Feed) Shutdown(opaque uint16) error

Shutdown feed, its upstream connection with kv and downstream endpoints. Synchronous call.

func (*Feed) ShutdownVbuckets

func (feed *Feed) ShutdownVbuckets(
	req *protobuf.ShutdownVbucketsRequest, opaque uint16) error

ShutdownVbuckets will shutdown streams for specified buckets. Synchronous call.

func (*Feed) StaleCheck

func (feed *Feed) StaleCheck(staleTimeout int) (string, error)

StaleCheck will check for feed sanity and return "exit" if feed has was already stale and still stale. Synchronous call.

type KVData

type KVData struct {
	// contains filtered or unexported fields
}

KVData captures an instance of data-path for single kv-node from upstream connection.

func NewKVData

func NewKVData(
	feed *Feed, bucket string,
	opaque uint16,
	reqTs *protobuf.TsVbuuid,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint,
	mutch <-chan *mc.DcpEvent,
	config c.Config) *KVData

NewKVData create a new data-path instance.

func (*KVData) AddEngines

func (kvdata *KVData) AddEngines(
	opaque uint16,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)

AddEngines and endpoints, synchronous call.

func (*KVData) Close

func (kvdata *KVData) Close() error

Close kvdata kv data path, synchronous call.

func (*KVData) DeleteEngines

func (kvdata *KVData) DeleteEngines(opaque uint16, engineKeys []uint64) error

DeleteEngines synchronous call.

func (*KVData) GetStatistics

func (kvdata *KVData) GetStatistics() map[string]interface{}

GetStatistics from kv data path, synchronous call.

func (*KVData) ReloadHeartbeat

func (kvdata *KVData) ReloadHeartbeat() error

ReloadHeartbeat for kvdata.

func (*KVData) ResetConfig

func (kvdata *KVData) ResetConfig(config c.Config) error

ResetConfig for kvdata.

func (*KVData) UpdateTs

func (kvdata *KVData) UpdateTs(opaque uint16, ts *protobuf.TsVbuuid) error

UpdateTs with new set of {vbno,seqno}, synchronous call.

type Projector

type Projector struct {
	// contains filtered or unexported fields
}

Projector data structure, a projector is connected to one or more upstream kv-nodes. Works in tandem with projector's adminport.

func NewProjector

func NewProjector(maxvbs int, config c.Config) *Projector

NewProjector creates a news projector instance and starts a corresponding adminport.

func (*Projector) AddFeed

func (p *Projector) AddFeed(topic string, feed *Feed) (err error)

AddFeed object for `topic`. - return ErrorTopicExist if topic is duplicate.

func (*Projector) DelFeed

func (p *Projector) DelFeed(topic string) (err error)

DelFeed object for `topic`. - return ErrorTopicMissing if topic is not started.

func (*Projector) GetConfig

func (p *Projector) GetConfig() c.Config

GetConfig returns the config object from projector.

func (*Projector) GetFeed

func (p *Projector) GetFeed(topic string) (*Feed, error)

GetFeed object for `topic`. - return ErrorTopicMissing if topic is not started.

func (*Projector) GetFeedConfig

func (p *Projector) GetFeedConfig() c.Config

GetFeedConfig from current configuration settings.

func (*Projector) GetFeeds

func (p *Projector) GetFeeds() []*Feed

GetFeeds return a list of all feeds.

func (*Projector) ResetConfig

func (p *Projector) ResetConfig(config c.Config)

ResetConfig accepts a full-set or subset of global configuration and updates projector related fields.

type Subscriber

type Subscriber interface {
	// GetEvaluators will return a map of uuid to Evaluator interface.
	// - return ErrorInconsistentFeed for malformed tables.
	GetEvaluators() (map[uint64]c.Evaluator, error)

	// GetRouters will return a map of uuid to Router interface.
	// - return ErrorInconsistentFeed for malformed tables.
	GetRouters() (map[uint64]c.Router, error)
}

Subscriber interface abstracts engines (aka instances) that can supply `evaluators`, to transform mutations into custom-messages, and `routers`, to supply distribution topology for custom-messages.

type Vbucket

type Vbucket struct {
	// contains filtered or unexported fields
}

Vbucket is immutable structure defined for each vbucket.

func NewVbucket

func NewVbucket(
	cluster, topic, bucket string, opaque, vbno uint16,
	vbuuid, startSeqno uint64, config c.Config) *Vbucket

NewVbucket creates a new routine to handle this vbucket stream.

type VbucketWorker

type VbucketWorker struct {
	// contains filtered or unexported fields
}

VbucketWorker is immutable structure defined for each vbucket.

func NewVbucketWorker

func NewVbucketWorker(
	id int, feed *Feed, bucket string,
	opaque uint16, config c.Config) *VbucketWorker

NewVbucketWorker creates a new routine to handle this vbucket stream.

func (*VbucketWorker) AddEngines

func (worker *VbucketWorker) AddEngines(
	opaque uint16,
	engines map[uint64]*Engine,
	endpoints map[string]c.RouterEndpoint) (map[uint16]uint64, error)

AddEngines update active set of engines and endpoints, synchronous call.

func (*VbucketWorker) Close

func (worker *VbucketWorker) Close() error

Close worker-routine, synchronous call.

func (*VbucketWorker) DeleteEngines

func (worker *VbucketWorker) DeleteEngines(
	opaque uint16, engines []uint64) error

DeleteEngines delete engines and update endpoints synchronous call.

func (*VbucketWorker) Event

func (worker *VbucketWorker) Event(m *mc.DcpEvent) error

Event will post an DcpEvent, asychronous call.

func (*VbucketWorker) GetStatistics

func (worker *VbucketWorker) GetStatistics() (map[string]interface{}, error)

GetStatistics for worker vbucket, synchronous call.

func (*VbucketWorker) GetVbuckets

func (worker *VbucketWorker) GetVbuckets() ([]*Vbucket, error)

GetVbuckets will return the list of active vbuckets managed by this workers.

func (*VbucketWorker) ResetConfig

func (worker *VbucketWorker) ResetConfig(config c.Config) error

ResetConfig for worker-routine, synchronous call.

func (*VbucketWorker) SyncPulse

func (worker *VbucketWorker) SyncPulse() error

SyncPulse will trigger worker to generate a sync pulse for all its vbuckets, asychronous call.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL