couchbase

package module
Version: v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2021 License: MIT Imports: 27 Imported by: 509

README

A smart client for couchbase in go

This is a unoffical version of a Couchbase Golang client. If you are looking for the Offical Couchbase Golang client please see [CB-go])[https://github.com/couchbaselabs/gocb].

This is an evolving package, but does provide a useful interface to a couchbase server including all of the pool/bucket discovery features, compatible key distribution with other clients, and vbucket motion awareness so application can continue to operate during rebalances.

It also supports view querying with source node randomization so you don't bang on all one node to do all the work.

Install

go get github.com/couchbase/go-couchbase

Example

c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
if err != nil {
	log.Fatalf("Error connecting:  %v", err)
}

pool, err := c.GetPool("default")
if err != nil {
	log.Fatalf("Error getting pool:  %v", err)
}

bucket, err := pool.GetBucket("default")
if err != nil {
	log.Fatalf("Error getting bucket:  %v", err)
}

bucket.Set("someKey", 0, []string{"an", "example", "list"})

Documentation

Overview

Package couchbase provides a smart client for go.

Usage:

client, err := couchbase.Connect("http://myserver:8091/")
handleError(err)
pool, err := client.GetPool("default")
handleError(err)
bucket, err := pool.GetBucket("MyAwesomeBucket")
handleError(err)
...

or a shortcut for the bucket directly

bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")

in any case, you can specify authentication credentials using standard URL userinfo syntax:

b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
        "default", "bucket")

Index

Constants

View Source
const (
	StatCount = BucketStats(iota)
	StatSize
)
View Source
const (
	// Raw specifies that the value is raw []byte or nil; don't
	// JSON-encode it.
	Raw = WriteOptions(1 << iota)
	// AddOnly indicates an item should only be written if it
	// doesn't exist, otherwise ErrKeyExists is returned.
	AddOnly
	// Persist causes the operation to block until the server
	// confirms the item is persisted.
	Persist
	// Indexable causes the operation to block until it's availble via the index.
	Indexable
	// Append indicates the given value should be appended to the
	// existing value for the given key.
	Append
)
View Source
const (
	PersistNone   = PersistTo(0x00)
	PersistMaster = PersistTo(0x01)
	PersistOne    = PersistTo(0x02)
	PersistTwo    = PersistTo(0x03)
	PersistThree  = PersistTo(0x04)
	PersistFour   = PersistTo(0x05)
)
View Source
const (
	ObserveNone           = ObserveTo(0x00)
	ObserveReplicateOne   = ObserveTo(0x01)
	ObserveReplicateTwo   = ObserveTo(0x02)
	ObserveReplicateThree = ObserveTo(0x03)
	ObserveReplicateFour  = ObserveTo(0x04)
)
View Source
const (
	OBSERVE = JobType(0x00)
	PERSIST = JobType(0x01)
)
View Source
const ABS_MAX_RETRIES = 10
View Source
const ABS_MIN_RETRIES = 3
View Source
const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
View Source
const DISCONNECT_PERIOD = 120 * time.Second
View Source
const HTTP_MAX_RETRY = 5

arbitary number, may need to be tuned #FIXME

View Source
const MAX_RETRY_COUNT = 5
View Source
const START_NODE_ID = -1
View Source
const UpdateCancel = memcached.CASQuit

Return this as the error from an UpdateFunc to cancel the Update operation.

Variables

View Source
var AsynchronousCloser = false

AsynchronousCloser turns on asynchronous closing for overflow connections

View Source
var ClientOpCallback func(opname, k string, start time.Time, err error)

ClientOpCallback is called for each invocation of Do.

View Source
var ClientTimeOut = 10 * time.Second
View Source
var ConnCloserInterval = time.Second * 30

overflow connection closer cycle time

View Source
var ConnPoolAvailWaitTime = time.Millisecond

ConnPoolAvailWaitTime is the amount of time to wait for an existing connection from the pool before considering the creation of a new one.

View Source
var ConnPoolCallback func(host string, source string, start time.Time, err error)

ConnPoolTimeout is notified whenever connections are acquired from a pool.

View Source
var ConnPoolTimeout = time.Hour * 24 * 30

Default timeout for retrieving a connection from the pool.

View Source
var DefaultDialTimeout = time.Duration(0)
View Source
var DefaultTimeout = time.Duration(0)
View Source
var EnableCollections = false

Enable Collections

View Source
var EnableDataType = false

Enable Data Type response

View Source
var EnableMutationToken = false

Enable MutationToken

View Source
var EnableXattr = false

Enable Xattr

View Source
var ErrKeyExists = errors.New("key exists")

Error returned from Write with AddOnly flag, when key already exists in the bucket.

View Source
var ErrOverwritten = errors.New("overwritten")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value has been overwritten by another before being persisted.

View Source
var ErrTimeout = errors.New("timeout")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value hasn't been persisted by the timeout interval

View Source
var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
View Source
var HTTPClientForStreaming = &http.Client{Transport: HTTPTransport, Timeout: 0}

Use this client for reading from streams that should be open for an extended duration.

View Source
var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s)
View Source
var MaxBulkRetries = 5000

Maximum number of times to retry a chunk of a bulk get on error.

View Source
var MaxIdleConnsPerHost = 256

HTTPClient to use for REST and view operations.

View Source
var OPJobChan = make(chan *ObservePersistJob, 1024)
View Source
var OPJobDone = make(chan bool)
View Source
var ObservePersistPool = NewPool(1024)
View Source
var PoolOverflow = 16

PoolOverflow is the number of overflow connections allowed in a pool.

View Source
var PoolSize = 64

PoolSize is the size of each connection pool (per host).

View Source
var SlowServerCallWarningThreshold time.Duration

If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call takes longer than that.

View Source
var TCPKeepalive = false

TCP KeepAlive enabled/disabled

View Source
var TCPKeepaliveInterval = 30 * 60

TCP keepalive interval in seconds. Default 30 minutes

View Source
var TimeoutError = errors.New("timeout waiting to build connection")

Error raised when a connection can't be retrieved from a pool.

View Source
var ViewCallback func(ddoc, name string, start time.Time, err error)

ViewCallback is called for each view invocation.

Functions

func AlreadyExistsError

func AlreadyExistsError(err error) bool

func CleanupHost

func CleanupHost(h, commonSuffix string) string

CleanupHost returns the hostname with the given suffix removed.

func ClientConfigForX509

func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error)

func DisableOverflowConnections

func DisableOverflowConnections()

Turn off overflow connections

func DropSystemBucket

func DropSystemBucket(c *Client, name string) error

func EnableAsynchronousCloser

func EnableAsynchronousCloser(closer bool)

Toggle asynchronous overflow closer

func FindCommonSuffix

func FindCommonSuffix(input []string) string

FindCommonSuffix returns the longest common suffix from the given strings.

func InitBulkGet

func InitBulkGet()

func IsKeyEExistsError

func IsKeyEExistsError(err error) bool

Return true if error is KEY_EEXISTS. Required by cbq-engine

func IsKeyNoEntError

func IsKeyNoEntError(err error) bool

Return true if error is KEY_ENOENT. Required by cbq-engine

func IsReadTimeOutError

func IsReadTimeOutError(err error) bool

func IsRefreshRequired

func IsRefreshRequired(err error) bool

Return true if error suggests a bucket refresh is required. Required by cbq-engine

func IsUnknownCollection

func IsUnknownCollection(err error) bool

Return true if a collection is not known. Required by cbq-engine

func MapKVtoSSL

func MapKVtoSSL(hostport string, ps *PoolServices) (string, bool, error)

Accepts a "host:port" string representing the KV TCP port and the pools nodeServices payload and returns a host:port string representing the KV TLS port on the same node as the KV TCP port. Returns the original host:port if in case of local communication (services on the same node as source)

func MapKVtoSSLExt

func MapKVtoSSLExt(hostport string, ps *PoolServices, force bool) (string, bool, error)

func ParseURL

func ParseURL(urlStr string) (result *url.URL, err error)

ParseURL is a wrapper around url.Parse with some sanity-checking

func SetCertFile

func SetCertFile(cert string)

func SetConnectionPoolParams

func SetConnectionPoolParams(size, overflow int)

Allow applications to speciify the Poolsize and Overflow

func SetKeyFile

func SetKeyFile(cert string)

func SetRootFile

func SetRootFile(cert string)

func SetSkipVerify

func SetSkipVerify(skip bool)

func SetTcpKeepalive

func SetTcpKeepalive(enabled bool, interval int)

Allow TCP keepalive parameters to be set by the application

func SetViewUpdateParams

func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error)

Set viewUpdateDaemonOptions

Types

type AuditSpec

type AuditSpec struct {
	Disabled       []uint32    `json:"disabled"`
	Uid            string      `json:"uid"`
	AuditdEnabled  bool        `json:"auditdEnabled`
	DisabledUsers  []AuditUser `json:"disabledUsers"`
	LogPath        string      `json:"logPath"`
	RotateInterval int64       `json:"rotateInterval"`
	RotateSize     int64       `json:"rotateSize"`
}

Sample data: {"disabled":["12333", "22244"],"uid":"132492431","auditdEnabled":true,

"disabledUsers":[{"name":"bill","domain":"local"},{"name":"bob","domain":"local"}],
"logPath":"/Users/johanlarson/Library/Application Support/Couchbase/var/lib/couchbase/logs",
"rotateInterval":86400,"rotateSize":20971520}

type AuditUser

type AuditUser struct {
	Name   string `json:"name"`
	Domain string `json:"domain"`
}

type AuthHandler

type AuthHandler interface {
	GetCredentials() (string, string, string)
}

AuthHandler is a callback that gets the auth username and password for the given bucket.

type AuthWithSaslHandler

type AuthWithSaslHandler interface {
	AuthHandler
	GetSaslCredentials() (string, string)
}

AuthHandler is a callback that gets the auth username and password for the given bucket and sasl for memcached.

type Bucket

type Bucket struct {
	sync.RWMutex
	Capabilities           []string           `json:"bucketCapabilities"`
	CapabilitiesVersion    string             `json:"bucketCapabilitiesVer"`
	CollectionsManifestUid string             `json:"collectionsManifestUid"`
	Type                   string             `json:"bucketType"`
	Name                   string             `json:"name"`
	NodeLocator            string             `json:"nodeLocator"`
	Quota                  map[string]float64 `json:"quota,omitempty"`
	Replicas               int                `json:"replicaNumber"`
	URI                    string             `json:"uri"`
	StreamingURI           string             `json:"streamingUri"`
	LocalRandomKeyURI      string             `json:"localRandomKeyUri,omitempty"`
	UUID                   string             `json:"uuid"`
	ConflictResolutionType string             `json:"conflictResolutionType,omitempty"`
	DDocs                  struct {
		URI string `json:"uri"`
	} `json:"ddocs,omitempty"`
	BasicStats  map[string]interface{} `json:"basicStats,omitempty"`
	Controllers map[string]interface{} `json:"controllers,omitempty"`

	// These are used for JSON IO, but isn't used for processing
	// since it needs to be swapped out safely.
	VBSMJson  VBucketServerMap `json:"vBucketServerMap"`
	NodesJSON []Node           `json:"nodes"`
	// contains filtered or unexported fields
}

Bucket is the primary entry point for most data operations. Bucket is a locked data structure. All access to its fields should be done using read or write locking, as appropriate.

Some access methods require locking, but rely on the caller to do so. These are appropriate for calls from methods that have already locked the structure. Methods like this take a boolean parameter "bucketLocked".

func ConnectWithAuthAndGetBucket

func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
	ah AuthHandler) (*Bucket, error)

ConnectWithAuthAndGetBucket is a convenience function for getting a named bucket from a given URL and an auth callback

func GetBucket

func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error)

GetBucket is a convenience function for getting a named bucket from a URL

func GetSystemBucket

func GetSystemBucket(c *Client, p *Pool, name string) (*Bucket, error)

func (*Bucket) Add

func (b *Bucket) Add(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, err error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. The value will be serialized into a JSON document.

func (*Bucket) AddRaw

func (b *Bucket) AddRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, err error)

AddRaw adds a value to this bucket; like SetRaw except that nothing happens if the key exists. The value will be stored as raw bytes.

func (*Bucket) AddRawWithMT

func (b *Bucket) AddRawWithMT(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error)

AddRaw adds a value to this bucket; like SetRaw except that nothing happens if the key exists. The value will be stored as raw bytes.

func (*Bucket) AddWithCAS

func (b *Bucket) AddWithCAS(k string, exp int, v interface{}, context ...*memcached.ClientContext) (bool, uint64, error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. Return the CAS value.

func (*Bucket) AddWithMT

func (b *Bucket) AddWithMT(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. The value will be serialized into a JSON document.

func (*Bucket) Append

func (b *Bucket) Append(k string, data []byte, context ...*memcached.ClientContext) error

Append appends raw data to an existing item.

func (*Bucket) Cas

func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error)

Set a value in this bucket with Cas and return the new Cas value

func (*Bucket) CasRaw

func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error)

Set a value in this bucket with Cas without json encoding it

func (*Bucket) CasWithMeta

func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error)

Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard

func (*Bucket) CasWithMetaRaw

func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error)

func (*Bucket) Close

func (b *Bucket) Close()

Close marks this bucket as no longer needed, closing connections it may have open.

func (*Bucket) CommonAddressSuffix

func (b *Bucket) CommonAddressSuffix() string

CommonAddressSuffix finds the longest common suffix of all host:port strings in the node list.

func (*Bucket) CreateCollection

func (b *Bucket) CreateCollection(scope string, collection string) error

func (*Bucket) CreateScope

func (b *Bucket) CreateScope(scope string) error

func (*Bucket) Decr

func (b *Bucket) Decr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error)

Decr decrements the value at a given key by amt and defaults to def if no value present

func (*Bucket) Delete

func (b *Bucket) Delete(k string, context ...*memcached.ClientContext) error

Delete a key from this bucket.

func (*Bucket) DeleteDDoc

func (b *Bucket) DeleteDDoc(docname string) error

DeleteDDoc removes a design document.

func (*Bucket) Do

func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error)

Do executes a function on a memcached connection to the node owning key "k"

Note that this automatically handles transient errors by replaying your function on a "not-my-vbucket" error, so don't assume your command will only be executed only once.

func (*Bucket) Do2

func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error)

func (*Bucket) DropCollection

func (b *Bucket) DropCollection(scope string, collection string) error

func (*Bucket) DropScope

func (b *Bucket) DropScope(scope string) error

func (*Bucket) FlushCollection

func (b *Bucket) FlushCollection(scope string, collection string) error

func (*Bucket) GatherStats

func (b *Bucket) GatherStats(which string) map[string]GatheredStats

GatherStats returns a map of server ID -> GatheredStats from all servers.

func (*Bucket) GatherStatsFunc

func (b *Bucket) GatherStatsFunc(which string, fn func(key, val []byte)) map[string]error

GatherStats returns a map of server ID -> GatheredStats from all servers.

func (*Bucket) Get

func (b *Bucket) Get(k string, rv interface{}, context ...*memcached.ClientContext) error

Get a value from this bucket. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetAndTouchRaw

func (b *Bucket) GetAndTouchRaw(k string, exp int, context ...*memcached.ClientContext) (data []byte,
	cas uint64, err error)

GetAndTouchRaw gets a raw value from this bucket including its CAS counter and flags, and updates the expiry on the doc.

func (*Bucket) GetBulk

func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, error)

func (*Bucket) GetBulkRaw

func (b *Bucket) GetBulkRaw(keys []string, context ...*memcached.ClientContext) (map[string][]byte, error)

Fetches multiple keys concurrently, with []byte values

This is a wrapper around GetBulk which converts all values returned by GetBulk from raw memcached responses into []byte slices. Returns one document for duplicate keys

func (*Bucket) GetCollectionCID

func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error)

Returns collectionUid, manifestUid, error.

func (*Bucket) GetCollectionsManifest

func (b *Bucket) GetCollectionsManifest() (*Manifest, error)

This function assumes the bucket is locked.

func (*Bucket) GetCount

func (b *Bucket) GetCount(refresh bool, context ...*memcached.ClientContext) (count int64, err error)

Get bucket count through the bucket stats

func (*Bucket) GetDDoc

func (b *Bucket) GetDDoc(docname string, into interface{}) error

GetDDoc retrieves a specific a design doc.

func (*Bucket) GetDDocWithRetry

func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error

func (*Bucket) GetDDocs

func (b *Bucket) GetDDocs() (DDocsResult, error)

GetDDocs lists all design documents

func (*Bucket) GetDDocsWithRetry

func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error)

func (*Bucket) GetFailoverLogs

func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error)

GetFailoverLogs, get the failover logs for a set of vbucket ids

func (*Bucket) GetIntStats

func (b *Bucket) GetIntStats(refresh bool, which []BucketStats, context ...*memcached.ClientContext) ([]int64, error)

Get selected bucket or collection stats

func (*Bucket) GetMeta

func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64, context ...*memcached.ClientContext) (err error)

GetMeta returns the meta values for a key

func (*Bucket) GetName

func (b *Bucket) GetName() string

func (*Bucket) GetNodeList

func (b *Bucket) GetNodeList(vb uint16) []string

func (*Bucket) GetPool

func (b *Bucket) GetPool() *Pool

GetPool gets the pool to which this bucket belongs.

func (*Bucket) GetPoolServices

func (b *Bucket) GetPoolServices(name string) (*PoolServices, error)

func (*Bucket) GetRandomDoc

func (b *Bucket) GetRandomDoc(context ...*memcached.ClientContext) (*gomemcached.MCResponse, error)

func (*Bucket) GetRaw

func (b *Bucket) GetRaw(k string, context ...*memcached.ClientContext) ([]byte, error)

GetRaw gets a raw value from this bucket. No marshaling is performed.

func (*Bucket) GetSize

func (b *Bucket) GetSize(refresh bool, context ...*memcached.ClientContext) (size int64, err error)

Get bucket document size through the bucket stats

func (*Bucket) GetStats

func (b *Bucket) GetStats(which string) map[string]map[string]string

GetStats gets a set of stats from all servers.

Returns a map of server ID -> map of stat key to map value.

func (*Bucket) GetUUID

func (b *Bucket) GetUUID() string

func (*Bucket) GetVBmap

func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error)

func (*Bucket) Gets

func (b *Bucket) Gets(k string, rv interface{}, caso *uint64, context ...*memcached.ClientContext) error

Gets gets a value from this bucket, including its CAS counter. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetsMC

func (b *Bucket) GetsMC(key string, reqDeadline time.Time, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error)

Get a value straight from Memcached

func (*Bucket) GetsRaw

func (b *Bucket) GetsRaw(k string, context ...*memcached.ClientContext) (data []byte, flags int,
	cas uint64, err error)

GetsRaw gets a raw value from this bucket including its CAS counter and flags.

func (*Bucket) GetsSubDoc

func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error)

Get a value through the subdoc API

func (*Bucket) HealthyNodes

func (b *Bucket) HealthyNodes() []Node

return the list of healthy nodes

func (*Bucket) Incr

func (b *Bucket) Incr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error)

Incr increments the value at a given key by amt and defaults to def if no value present.

func (*Bucket) NodeAddresses

func (b *Bucket) NodeAddresses() []string

NodeAddresses gets the (sorted) list of memcached node addresses (hostname:port).

func (*Bucket) NodeListChanged

func (b *Bucket) NodeListChanged() bool

This API lets the caller know, if the list of nodes a bucket is connected to has gone through an edit (a rebalance operation) since the last update to the bucket, in which case a Refresh is advised.

func (*Bucket) Nodes

func (b *Bucket) Nodes() []Node

Nodes returns the current list of nodes servicing this bucket.

func (*Bucket) OPJobPoll

func (b *Bucket) OPJobPoll()

func (*Bucket) Observe

func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error)

Observe observes the current state of a document.

func (*Bucket) ObserveAndPersistPoll

func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool)

func (*Bucket) PutDDoc

func (b *Bucket) PutDDoc(docname string, value interface{}) error

PutDDoc installs a design document.

func (*Bucket) Refresh

func (b *Bucket) Refresh() error

func (*Bucket) RefreshFully

func (b *Bucket) RefreshFully() error

func (*Bucket) ReleaseGetBulkPools

func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse)

func (*Bucket) RunBucketUpdater

func (b *Bucket) RunBucketUpdater(notify NotifyFn)

func (*Bucket) RunBucketUpdater2

func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn)

func (*Bucket) Set

func (b *Bucket) Set(k string, exp int, v interface{}, context ...*memcached.ClientContext) error

Set a value in this bucket. The value will be serialized into a JSON document.

func (*Bucket) SetObserveAndPersist

func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error)

func (*Bucket) SetRaw

func (b *Bucket) SetRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) error

SetRaw sets a value in this bucket without JSON encoding it.

func (*Bucket) SetWithCAS

func (b *Bucket) SetWithCAS(k string, exp int, v interface{}, context ...*memcached.ClientContext) (uint64, error)

Set a value in this bucket.

func (*Bucket) SetWithMeta

func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}, context ...*memcached.ClientContext) (*MutationToken, error)

Set a value in this bucket with with flags

func (*Bucket) StartOPPollers

func (b *Bucket) StartOPPollers(maxWorkers int)

func (*Bucket) StartTapFeed

func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error)

StartTapFeed creates and starts a new Tap feed

func (*Bucket) StartUprFeed

func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error)

func (*Bucket) StartUprFeedWithConfig

func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error)

StartUprFeed creates and starts a new Upr feed No data will be sent on the channel unless vbuckets streams are requested

func (*Bucket) Update

func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error

Update performs a Safe update of a document, avoiding conflicts by using CAS.

The callback function will be invoked with the current raw document contents (or nil if the document doesn't exist); it should return the updated raw contents (or nil to delete.) If it decides not to change anything it can return UpdateCancel as the error.

If another writer modifies the document between the get and the set, the callback will be invoked again with the newer value.

func (*Bucket) UpdateBucket

func (b *Bucket) UpdateBucket() error

func (*Bucket) UpdateBucket2

func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error

func (*Bucket) VBHash

func (b *Bucket) VBHash(key string) uint32

VBHash finds the vbucket for the given key.

func (*Bucket) VBServerMap

func (b *Bucket) VBServerMap() *VBucketServerMap

VBServerMap returns the current VBucketServerMap.

func (*Bucket) View

func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error)

View executes a view.

The ddoc parameter is just the bare name of your design doc without the "_design/" prefix.

Parameters are string keys with values that correspond to couchbase view parameters. Primitive should work fairly naturally (booleans, ints, strings, etc...) and other values will attempt to be JSON marshaled (useful for array indexing on on view keys, for example).

Example:

res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
    "group_level": 2,
    "startkey_docid":    []interface{}{"thing"},
    "endkey_docid":      []interface{}{"thing", map[string]string{}},
    "stale": false,
    })

func (*Bucket) ViewCustom

func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
	vres interface{}) (err error)

ViewCustom performs a view request that can map row values to a custom type.

See the source to View for an example usage.

func (*Bucket) ViewURL

func (b *Bucket) ViewURL(ddoc, name string,
	params map[string]interface{}) (string, error)

ViewURL constructs a URL for a view with the given ddoc, view name, and parameters.

func (*Bucket) WaitForPersistence

func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error

WaitForPersistence waits for an item to be considered durable.

Besides transport errors, ErrOverwritten may be returned if the item is overwritten before it reaches durability. ErrTimeout may occur if the item isn't found durable in a reasonable amount of time.

func (*Bucket) Write

func (b *Bucket) Write(k string, flags, exp int, v interface{},
	opt WriteOptions, context ...*memcached.ClientContext) (err error)

General-purpose value setter.

The Set, Add and Delete methods are just wrappers around this. The interpretation of `v` depends on whether the `Raw` option is given. If it is, v must be a byte array or nil. (A nil value causes a delete.) If `Raw` is not given, `v` will be marshaled as JSON before being written. It must be JSON-marshalable and it must not be nil.

func (*Bucket) WriteCas

func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
	opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, err error)

func (*Bucket) WriteCasWithMT

func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{},
	opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, mt *MutationToken, err error)

func (*Bucket) WriteUpdate

func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error

WriteUpdate performs a Safe update of a document, avoiding conflicts by using CAS. WriteUpdate is like Update, except that the callback can return a set of WriteOptions, of which Persist and Indexable are recognized: these cause the call to wait until the document update has been persisted to disk and/or become available to index.

func (*Bucket) WriteWithCAS

func (b *Bucket) WriteWithCAS(k string, flags, exp int, v interface{},
	opt WriteOptions, context ...*memcached.ClientContext) (cas uint64, err error)

func (*Bucket) WriteWithMT

func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{},
	opt WriteOptions, context ...*memcached.ClientContext) (mt *MutationToken, err error)

type BucketAuth

type BucketAuth struct {
	// contains filtered or unexported fields
}

func (*BucketAuth) GetCredentials

func (ba *BucketAuth) GetCredentials() (string, string, string)

type BucketInfo

type BucketInfo struct {
	Name     string // name of bucket
	User     string // Username to use for access
	Password string // SASL password of bucket
}

func GetBucketList

func GetBucketList(baseU string) (bInfo []BucketInfo, err error)

Get SASL buckets

type BucketNotFoundError

type BucketNotFoundError struct {
	// contains filtered or unexported fields
}

func (*BucketNotFoundError) Error

func (e *BucketNotFoundError) Error() string

type BucketStats

type BucketStats int

type Client

type Client struct {
	BaseURL *url.URL

	Info Pools
	// contains filtered or unexported fields
}

A Client is the starting point for all services across all buckets in a Couchbase cluster.

func Connect

func Connect(baseU string) (Client, error)

Connect to a couchbase cluster. An authentication handler will be created from the userinfo in the URL if provided.

func ConnectWithAuth

func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error)

ConnectWithAuth connects to a couchbase cluster with the given authentication handler.

func ConnectWithAuthCreds

func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error)

ConnectWithAuthCreds connects to a couchbase cluster with the give authorization creds returned by cb_auth

func (*Client) ClearTLS

func (c *Client) ClearTLS()

func (*Client) GetAuditSpec

func (c *Client) GetAuditSpec() (*AuditSpec, error)

func (*Client) GetPool

func (c *Client) GetPool(name string) (p Pool, err error)

GetPool gets a pool from within the couchbase cluster (usually "default").

func (*Client) GetPoolServices

func (c *Client) GetPoolServices(name string) (ps PoolServices, err error)

GetPoolServices returns all the bucket-independent services in a pool. (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)

func (*Client) GetRolesAll

func (c *Client) GetRolesAll() ([]RoleDescription, error)

func (*Client) GetUserInfoAll

func (c *Client) GetUserInfoAll() ([]User, error)

func (*Client) GetUserRoles

func (c *Client) GetUserRoles() ([]interface{}, error)

Return user-role data, as parsed JSON. Sample:

[{"id":"ivanivanov","name":"Ivan Ivanov","roles":[{"role":"cluster_admin"},{"bucket_name":"default","role":"bucket_admin"}]},
 {"id":"petrpetrov","name":"Petr Petrov","roles":[{"role":"replication_admin"}]}]

func (*Client) InitTLS

func (c *Client) InitTLS(certFile string) error

Call this method with a TLS certificate file name to make communication with the KV engine encrypted.

This method should be called immediately after a Connect*() method.

func (*Client) ProcessStream

func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error

func (*Client) PutUserInfo

func (c *Client) PutUserInfo(u *User) error

type Collection

type Collection struct {
	Name string
	Uid  uint64
}

type DDoc

type DDoc struct {
	Language string                    `json:"language,omitempty"`
	Views    map[string]ViewDefinition `json:"views"`
}

DDoc is the document body of a design document specifying a view.

type DDocsResult

type DDocsResult struct {
	Rows []struct {
		DDoc struct {
			Meta map[string]interface{}
			JSON DDoc
		} `json:"doc"`
	} `json:"rows"`
}

DDocsResult represents the result from listing the design documents.

type DocID

type DocID string

DocID is the document ID type for the startkey_docid parameter in views.

type DurablitySettings

type DurablitySettings struct {
	Persist PersistTo
	Observe ObserveTo
}

type FailoverLog

type FailoverLog map[uint16]memcached.FailoverLog

type FeedInfo

type FeedInfo struct {
	// contains filtered or unexported fields
}

UprFeed from a single connection

type GatheredStats

type GatheredStats struct {
	Server string
	Stats  map[string]string
	Err    error
}

type GenericMcdAuthHandler

type GenericMcdAuthHandler interface {
	AuthHandler
	AuthenticateMemcachedConn(host string, conn *memcached.Client) error
}

GenericMcdAuthHandler is a kind of AuthHandler that performs special auth exchange (like non-standard auth, possibly followed by select-bucket).

type HTTPAuthHandler

type HTTPAuthHandler interface {
	AuthHandler
	SetCredsForRequest(req *http.Request) error
}

HTTPAuthHandler is kind of AuthHandler that performs more general for outgoing http requests than is possible via simple GetCredentials() call (i.e. digest auth or different auth per different destinations).

type InputCollection

type InputCollection struct {
	Name string
	Uid  string
}

type InputManifest

type InputManifest struct {
	Uid    string
	Scopes []InputScope
}

Structures for parsing collections manifest. The map key is the name of the scope. Example data: {"uid":"b","scopes":[

{"name":"_default","uid":"0","collections":[
   {"name":"_default","uid":"0"}]},
{"name":"myScope1","uid":"8","collections":[
   {"name":"myCollectionB","uid":"c"},
   {"name":"myCollectionA","uid":"b"}]},
{"name":"myScope2","uid":"9","collections":[
   {"name":"myCollectionC","uid":"d"}]}]}

type InputScope

type InputScope struct {
	Name        string
	Uid         string
	Collections []InputCollection
}

type JobType

type JobType uint8

type Manifest

type Manifest struct {
	Uid    uint64
	Scopes map[string]*Scope // map by name
}

Structures for storing collections information.

type MultiBucketAuthHandler

type MultiBucketAuthHandler interface {
	AuthHandler
	ForBucket(bucket string) AuthHandler
}

MultiBucketAuthHandler is kind of AuthHandler that may perform different auth for different buckets.

type MutationToken

type MutationToken struct {
	VBid  uint16 // vbucket id
	Guard uint64 // vbuuid
	Value uint64 // sequence number
}

Mutation Token

type Node

type Node struct {
	ClusterCompatibility int                           `json:"clusterCompatibility"`
	ClusterMembership    string                        `json:"clusterMembership"`
	CouchAPIBase         string                        `json:"couchApiBase"`
	Hostname             string                        `json:"hostname"`
	AlternateNames       map[string]NodeAlternateNames `json:"alternateAddresses"`
	InterestingStats     map[string]float64            `json:"interestingStats,omitempty"`
	MCDMemoryAllocated   float64                       `json:"mcdMemoryAllocated"`
	MCDMemoryReserved    float64                       `json:"mcdMemoryReserved"`
	MemoryFree           float64                       `json:"memoryFree"`
	MemoryTotal          float64                       `json:"memoryTotal"`
	OS                   string                        `json:"os"`
	Ports                map[string]int                `json:"ports"`
	Services             []string                      `json:"services"`
	Status               string                        `json:"status"`
	Uptime               int                           `json:"uptime,string"`
	Version              string                        `json:"version"`
	ThisNode             bool                          `json:"thisNode,omitempty"`
}

A Node is a computer in a cluster running the couchbase software.

type NodeAlternateNames

type NodeAlternateNames struct {
	Hostname string         `json:"hostname"`
	Ports    map[string]int `json:"ports"`
}

type NodeServices

type NodeServices struct {
	Services       map[string]int                `json:"services,omitempty"`
	Hostname       string                        `json:"hostname"`
	ThisNode       bool                          `json:"thisNode"`
	AlternateNames map[string]NodeAlternateNames `json:"alternateAddresses"`
}

NodeServices is all the bucket-independent services running on a node (given by Hostname)

type NotifyFn

type NotifyFn func(bucket string, err error)

type OPErrResponse

type OPErrResponse struct {
	// contains filtered or unexported fields
}

type OPpool

type OPpool struct {
	// contains filtered or unexported fields
}

pool of ObservePersist Jobs

func NewPool

func NewPool(max int) *OPpool

NewPool creates a new pool of jobs

func (*OPpool) Get

func (p *OPpool) Get() *ObservePersistJob

Borrow a Client from the pool.

func (*OPpool) Put

func (p *OPpool) Put(o *ObservePersistJob)

Return returns a Client to the pool.

type ObservePersistJob

type ObservePersistJob struct {
	// contains filtered or unexported fields
}

type ObserveTo

type ObserveTo uint8

type PersistTo

type PersistTo uint8

type Pool

type Pool struct {
	BucketMap map[string]*Bucket
	Nodes     []Node

	BucketURL map[string]string `json:"buckets"`

	MemoryQuota         float64 `json:"memoryQuota"`
	CbasMemoryQuota     float64 `json:"cbasMemoryQuota"`
	EventingMemoryQuota float64 `json:"eventingMemoryQuota"`
	FtsMemoryQuota      float64 `json:"ftsMemoryQuota"`
	IndexMemoryQuota    float64 `json:"indexMemoryQuota"`
	// contains filtered or unexported fields
}

A Pool of nodes and buckets.

func (*Pool) Close

func (p *Pool) Close()

Release bucket connections when the pool is no longer in use

func (*Pool) GetBucket

func (p *Pool) GetBucket(name string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetBucketWithAuth

func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetClient

func (p *Pool) GetClient() *Client

GetClient gets the client from which we got this pool.

type PoolServices

type PoolServices struct {
	Rev          int             `json:"rev"`
	NodesExt     []NodeServices  `json:"nodesExt"`
	Capabilities json.RawMessage `json:"clusterCapabilities"`
}

PoolServices is all the bucket-independent services in a pool

func ParsePoolServices

func ParsePoolServices(jsonInput string) (*PoolServices, error)

type Pools

type Pools struct {
	ComponentsVersion     map[string]string `json:"componentsVersion,omitempty"`
	ImplementationVersion string            `json:"implementationVersion"`
	IsAdmin               bool              `json:"isAdminCreds"`
	UUID                  string            `json:"uuid"`
	Pools                 []RestPool        `json:"pools"`
}

Pools represents the collection of pools as returned from the REST API.

type RestPool

type RestPool struct {
	Name         string `json:"name"`
	StreamingURI string `json:"streamingUri"`
	URI          string `json:"uri"`
}

RestPool represents a single pool returned from the pools REST API.

type Role

type Role struct {
	Role           string
	BucketName     string `json:"bucket_name"`
	ScopeName      string `json:"scope_name"`
	CollectionName string `json:"collection_name"`
}

type RoleDescription

type RoleDescription struct {
	Role       string
	Name       string
	Desc       string
	Ce         bool
	BucketName string `json:"bucket_name"`
}

Sample: {"role":"admin","name":"Admin","desc":"Can manage ALL cluster features including security.","ce":true} {"role":"query_select","bucket_name":"*","name":"Query Select","desc":"Can execute SELECT statement on bucket to retrieve data"}

type Scope

type Scope struct {
	Name        string
	Uid         uint64
	Collections map[string]*Collection // map by name
}

type StreamingFn

type StreamingFn func(bucket *Bucket)

type TapFeed

type TapFeed struct {
	C <-chan memcached.TapEvent
	// contains filtered or unexported fields
}

A TapFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*TapFeed) Close

func (feed *TapFeed) Close() error

Close a Tap feed.

type UpdateFunc

type UpdateFunc func(current []byte) (updated []byte, err error)

An UpdateFunc is a callback function to update a document

type UprFeed

type UprFeed struct {
	C <-chan *memcached.UprEvent
	// contains filtered or unexported fields
}

A UprFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*UprFeed) Close

func (feed *UprFeed) Close() error

Close a Upr feed.

func (*UprFeed) UprCloseStream

func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error

UprCloseStream ends a vbucket stream.

func (*UprFeed) UprRequestStream

func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error

UprRequestStream starts a stream for a vb on a feed

type User

type User struct {
	Name   string
	Id     string
	Domain string
	Roles  []Role
}

type VBucketServerMap

type VBucketServerMap struct {
	HashAlgorithm string   `json:"hashAlgorithm"`
	NumReplicas   int      `json:"numReplicas"`
	ServerList    []string `json:"serverList"`
	VBucketMap    [][]int  `json:"vBucketMap"`
}

VBucketServerMap is the a mapping of vbuckets to nodes.

type ViewDefinition

type ViewDefinition struct {
	Map    string `json:"map"`
	Reduce string `json:"reduce,omitempty"`
}

ViewDefinition represents a single view within a design document.

type ViewError

type ViewError struct {
	From   string
	Reason string
}

A ViewError is a node-specific error indicating a partial failure within a view result.

func (ViewError) Error

func (ve ViewError) Error() string

type ViewResult

type ViewResult struct {
	TotalRows int `json:"total_rows"`
	Rows      []ViewRow
	Errors    []ViewError
}

ViewResult holds the entire result set from a view request, including the rows and the errors.

type ViewRow

type ViewRow struct {
	ID    string
	Key   interface{}
	Value interface{}
	Doc   *interface{}
}

ViewRow represents a single result from a view.

Doc is present only if include_docs was set on the request.

type WriteOptions

type WriteOptions int

WriteOptions is the set of option flags availble for the Write method. They are ORed together to specify the desired request.

func (WriteOptions) String

func (w WriteOptions) String() string

String representation of WriteOptions

type WriteUpdateFunc

type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)

A WriteUpdateFunc is a callback function to update a document

Directories

Path Synopsis
Package cbdatasource streams data from a Couchbase cluster.
Package cbdatasource streams data from a Couchbase cluster.
examples
upr_bench
Tool receives raw events from go-couchbase UPR client.
Tool receives raw events from go-couchbase UPR client.
tools
Package trace provides a ring buffer utility to trace events.
Package trace provides a ring buffer utility to trace events.
Package couchbaseutil offers some convenience functions for apps that use couchbase.
Package couchbaseutil offers some convenience functions for apps that use couchbase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto