gomemcached

package module
Version: v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2021 License: MIT Imports: 7 Imported by: 142

README

gomemcached

This is a memcached binary protocol toolkit in go.

It provides client and server functionality as well as a little sample server showing how I might make a server if I valued purity over performance.

Server Design

overview

The basic design can be seen in gocache. A storage server is run as a goroutine that receives a MCRequest on a channel, and then issues an MCResponse to a channel contained within the request.

Each connection is a separate goroutine, of course, and is responsible for all IO for that connection until the connection drops or the dataServer decides it's stupid and sends a fatal response back over the channel.

There is currently no work at all in making the thing perform (there are specific areas I know need work). This is just my attempt to learn the language somewhat.

Documentation

Overview

Package gomemcached is binary protocol packet formats and constants.

Index

Constants

View Source
const (
	REQ_MAGIC      = 0x80
	RES_MAGIC      = 0x81
	FLEX_MAGIC     = 0x08
	FLEX_RES_MAGIC = 0x18
)
View Source
const (
	GET        = CommandCode(0x00)
	SET        = CommandCode(0x01)
	ADD        = CommandCode(0x02)
	REPLACE    = CommandCode(0x03)
	DELETE     = CommandCode(0x04)
	INCREMENT  = CommandCode(0x05)
	DECREMENT  = CommandCode(0x06)
	QUIT       = CommandCode(0x07)
	FLUSH      = CommandCode(0x08)
	GETQ       = CommandCode(0x09)
	NOOP       = CommandCode(0x0a)
	VERSION    = CommandCode(0x0b)
	GETK       = CommandCode(0x0c)
	GETKQ      = CommandCode(0x0d)
	APPEND     = CommandCode(0x0e)
	PREPEND    = CommandCode(0x0f)
	STAT       = CommandCode(0x10)
	SETQ       = CommandCode(0x11)
	ADDQ       = CommandCode(0x12)
	REPLACEQ   = CommandCode(0x13)
	DELETEQ    = CommandCode(0x14)
	INCREMENTQ = CommandCode(0x15)
	DECREMENTQ = CommandCode(0x16)
	QUITQ      = CommandCode(0x17)
	FLUSHQ     = CommandCode(0x18)
	APPENDQ    = CommandCode(0x19)
	AUDIT      = CommandCode(0x27)
	PREPENDQ   = CommandCode(0x1a)
	GAT        = CommandCode(0x1d)
	HELLO      = CommandCode(0x1f)
	RGET       = CommandCode(0x30)
	RSET       = CommandCode(0x31)
	RSETQ      = CommandCode(0x32)
	RAPPEND    = CommandCode(0x33)
	RAPPENDQ   = CommandCode(0x34)
	RPREPEND   = CommandCode(0x35)
	RPREPENDQ  = CommandCode(0x36)
	RDELETE    = CommandCode(0x37)
	RDELETEQ   = CommandCode(0x38)
	RINCR      = CommandCode(0x39)
	RINCRQ     = CommandCode(0x3a)
	RDECR      = CommandCode(0x3b)
	RDECRQ     = CommandCode(0x3c)

	SASL_LIST_MECHS = CommandCode(0x20)
	SASL_AUTH       = CommandCode(0x21)
	SASL_STEP       = CommandCode(0x22)

	SET_VBUCKET = CommandCode(0x3d)

	TAP_CONNECT          = CommandCode(0x40) // Client-sent request to initiate Tap feed
	TAP_MUTATION         = CommandCode(0x41) // Notification of a SET/ADD/REPLACE/etc. on the server
	TAP_DELETE           = CommandCode(0x42) // Notification of a DELETE on the server
	TAP_FLUSH            = CommandCode(0x43) // Replicates a flush_all command
	TAP_OPAQUE           = CommandCode(0x44) // Opaque control data from the engine
	TAP_VBUCKET_SET      = CommandCode(0x45) // Sets state of vbucket in receiver (used in takeover)
	TAP_CHECKPOINT_START = CommandCode(0x46) // Notifies start of new checkpoint
	TAP_CHECKPOINT_END   = CommandCode(0x47) // Notifies end of checkpoint
	GET_ALL_VB_SEQNOS    = CommandCode(0x48) // Get current high sequence numbers from all vbuckets located on the server

	UPR_OPEN        = CommandCode(0x50) // Open a UPR connection with a name
	UPR_ADDSTREAM   = CommandCode(0x51) // Sent by ebucketMigrator to UPR Consumer
	UPR_CLOSESTREAM = CommandCode(0x52) // Sent by eBucketMigrator to UPR Consumer
	UPR_FAILOVERLOG = CommandCode(0x54) // Request failover logs
	UPR_STREAMREQ   = CommandCode(0x53) // Stream request from consumer to producer
	UPR_STREAMEND   = CommandCode(0x55) // Sent by producer when it has no more messages to stream
	UPR_SNAPSHOT    = CommandCode(0x56) // Start of a new snapshot
	UPR_MUTATION    = CommandCode(0x57) // Key mutation
	UPR_DELETION    = CommandCode(0x58) // Key deletion
	UPR_EXPIRATION  = CommandCode(0x59) // Key expiration
	UPR_FLUSH       = CommandCode(0x5a) // Delete all the data for a vbucket
	UPR_NOOP        = CommandCode(0x5c) // UPR NOOP
	UPR_BUFFERACK   = CommandCode(0x5d) // UPR Buffer Acknowledgement
	UPR_CONTROL     = CommandCode(0x5e) // Set flow control params

	GET_REPLICA   = CommandCode(0x83) // Get from replica
	SELECT_BUCKET = CommandCode(0x89) // Select bucket

	OBSERVE_SEQNO = CommandCode(0x91) // Sequence Number based Observe
	OBSERVE       = CommandCode(0x92)

	GET_META                 = CommandCode(0xA0) // Get meta. returns with expiry, flags, cas etc
	SET_WITH_META            = CommandCode(0xa2)
	ADD_WITH_META            = CommandCode(0xa4)
	DELETE_WITH_META         = CommandCode(0xa8)
	GET_COLLECTIONS_MANIFEST = CommandCode(0xba) // Get entire collections manifest.
	COLLECTIONS_GET_CID      = CommandCode(0xbb) // Get collection id.
	SET_TIME_SYNC            = CommandCode(0xc1)

	SUBDOC_GET            = CommandCode(0xc5) // Get subdoc. Returns with xattrs
	SUBDOC_DICT_UPSERT    = CommandCode(0xc8)
	SUBDOC_DELETE         = CommandCode(0xc9) // Delete a path
	SUBDOC_MULTI_LOOKUP   = CommandCode(0xd0) // Multi lookup. Doc xattrs and meta.
	SUBDOC_MULTI_MUTATION = CommandCode(0xd1) // Multi mutation. Doc and xattr

	DCP_SYSTEM_EVENT = CommandCode(0x5f) // A system event has occurred
	DCP_SEQNO_ADV    = CommandCode(0x64) // Sent when the vb seqno has advanced due to an unsubscribed event
	DCP_OSO_SNAPSHOT = CommandCode(0x65) // Marks the begin and end of out-of-sequence-number stream
)
View Source
const (
	SUCCESS            = Status(0x00)
	KEY_ENOENT         = Status(0x01)
	KEY_EEXISTS        = Status(0x02)
	E2BIG              = Status(0x03)
	EINVAL             = Status(0x04)
	NOT_STORED         = Status(0x05)
	DELTA_BADVAL       = Status(0x06)
	NOT_MY_VBUCKET     = Status(0x07)
	NO_BUCKET          = Status(0x08)
	LOCKED             = Status(0x09)
	AUTH_STALE         = Status(0x1f)
	AUTH_ERROR         = Status(0x20)
	AUTH_CONTINUE      = Status(0x21)
	ERANGE             = Status(0x22)
	ROLLBACK           = Status(0x23)
	EACCESS            = Status(0x24)
	NOT_INITIALIZED    = Status(0x25)
	UNKNOWN_COMMAND    = Status(0x81)
	ENOMEM             = Status(0x82)
	NOT_SUPPORTED      = Status(0x83)
	EINTERNAL          = Status(0x84)
	EBUSY              = Status(0x85)
	TMPFAIL            = Status(0x86)
	XATTR_EINVAL       = Status(0x87)
	UNKNOWN_COLLECTION = Status(0x88)

	DURABILITY_INVALID_LEVEL      = Status(0xa0)
	DURABILITY_IMPOSSIBLE         = Status(0xa1)
	SYNC_WRITE_IN_PROGRESS        = Status(0xa2)
	SYNC_WRITE_AMBIGUOUS          = Status(0xa3)
	SYNC_WRITE_RECOMMITINPROGRESS = Status(0xa4)

	// SUBDOC
	SUBDOC_PATH_NOT_FOUND             = Status(0xc0)
	SUBDOC_BAD_MULTI                  = Status(0xcc)
	SUBDOC_SUCCESS_DELETED            = Status(0xcd)
	SUBDOC_MULTI_PATH_FAILURE_DELETED = Status(0xd3)

	// Not a Memcached status
	UNKNOWN_STATUS = Status(0xffff)
)

Matches with protocol_binary.h as source of truth

View Source
const (
	// doc level flags for subdoc commands
	SUBDOC_FLAG_NONE              uint8 = 0x00
	SUBDOC_FLAG_MKDOC             uint8 = 0x01 // Create if it does not exist
	SUBDOC_FLAG_ADD               uint8 = 0x02 // Add doc only if it does not exist.
	SUBDOC_FLAG_ACCESS_DELETED    uint8 = 0x04 // allow access to XATTRs for deleted document
	SUBDOC_FLAG_CREATE_AS_DELETED uint8 = 0x08 // Used with mkdoc/add
	SUBDOC_FLAG_REVIVED_DOC       uint8 = 0x10

	// path level flags for subdoc commands
	SUBDOC_FLAG_NONE_PATH          uint8 = 0x00
	SUBDOC_FLAG_MKDIR_PATH         uint8 = 0x01 // create path
	SUBDOC_FLAG_XATTR_PATH         uint8 = 0x04 // if set, the path refers to an XATTR
	SUBDOC_FLAG_EXPAND_MACRRO_PATH uint8 = 0x10 // Expand macro value inside XATTR
)
View Source
const (
	UdTagBegin = "<ud>"
	UdTagEnd   = "</ud>"
)

for log redaction

View Source
const (
	BACKFILL           = TapConnectFlag(0x01)
	DUMP               = TapConnectFlag(0x02)
	LIST_VBUCKETS      = TapConnectFlag(0x04)
	TAKEOVER_VBUCKETS  = TapConnectFlag(0x08)
	SUPPORT_ACK        = TapConnectFlag(0x10)
	REQUEST_KEYS_ONLY  = TapConnectFlag(0x20)
	CHECKPOINT         = TapConnectFlag(0x40)
	REGISTERED_CLIENT  = TapConnectFlag(0x80)
	FIX_FLAG_BYTEORDER = TapConnectFlag(0x100)
)

Tap connect option flags

View Source
const (
	TAP_OPAQUE_ENABLE_AUTO_NACK       = 0
	TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1
	TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2
	TAP_OPAQUE_CLOSE_TAP_STREAM       = 7
	TAP_OPAQUE_CLOSE_BACKFILL         = 8
)

Tap opaque event subtypes

View Source
const (
	TAP_ACK                     = 1
	TAP_NO_VALUE                = 2
	TAP_FLAG_NETWORK_BYTE_ORDER = 4
)

Tap item flags

View Source
const FAST_FRAME_LEN = 15
View Source
const HDR_LEN = 24

Number of bytes in a binary protocol header.

View Source
const MAX_USER_LEN = 128

Variables

command codes that are counted toward DCP control buffer when DCP clients receive DCP messages with these command codes, they need to provide acknowledgement

View Source
var CommandNames map[CommandCode]string

Mapping of CommandCode -> name of command (not exhaustive)

View Source
var DCP_OPEN_INCLUDE_DELETE_TIMES uint32 = 0x20

the include deletion time bit in dcp flags

View Source
var DCP_OPEN_INCLUDE_XATTRS uint32 = 0x04

the include XATTRS bit in dcp flags

View Source
var DCP_PRODUCER uint32 = 0x01

the producer/consumer bit in dcp flags

View Source
var ErrorInvalidOp error = fmt.Errorf("Specified method is not applicable")
View Source
var ErrorObjLenNotMatch error = fmt.Errorf("Object length does not match data")
View Source
var MaxBodyLen = int(20 * 1024 * 1024)

The maximum reasonable body length to expect. Anything larger than this will result in an error. The current limit, 20MB, is the size limit supported by ep-engine.

View Source
var SUBDOC_FLAG_XATTR uint8 = 0x04

Datatype to Include XATTRS in SUBDOC GET

View Source
var StatusNames map[Status]string

StatusNames human readable names for memcached response.

View Source
var TapConnectFlagNames = map[TapConnectFlag]string{
	BACKFILL:           "BACKFILL",
	DUMP:               "DUMP",
	LIST_VBUCKETS:      "LIST_VBUCKETS",
	TAKEOVER_VBUCKETS:  "TAKEOVER_VBUCKETS",
	SUPPORT_ACK:        "SUPPORT_ACK",
	REQUEST_KEYS_ONLY:  "REQUEST_KEYS_ONLY",
	CHECKPOINT:         "CHECKPOINT",
	REGISTERED_CLIENT:  "REGISTERED_CLIENT",
	FIX_FLAG_BYTEORDER: "FIX_FLAG_BYTEORDER",
}

TapConnectFlagNames for TapConnectFlag

TapFlagParsers parser functions for TAP fields.

Functions

func IsFatal

func IsFatal(e error) bool

IsFatal is false if this error isn't believed to be fatal to a connection.

func IsNotFound

func IsNotFound(e error) bool

IsNotFound is true if this error represents a "not found" response.

func Merge2HalfByteSlices

func Merge2HalfByteSlices(src1, src2 []byte) (output []byte)

func ShiftByteSliceLeft4Bits

func ShiftByteSliceLeft4Bits(slice []byte) (replacement []byte)

func ShiftByteSliceRight4Bits

func ShiftByteSliceRight4Bits(slice []byte) (replacement []byte)

The following is used to theoretically support frameInfo ObjID extensions for completeness, but they are not very efficient though

func TapParseBool

func TapParseBool(r io.Reader) (interface{}, error)

TapParseBool is a function to parse a single tap boolean.

func TapParseUint16

func TapParseUint16(r io.Reader) (interface{}, error)

TapParseUint16 is a function to parse a single tap uint16.

func TapParseUint64

func TapParseUint64(r io.Reader) (interface{}, error)

TapParseUint64 is a function to parse a single tap uint64.

func TapParseVBList

func TapParseVBList(r io.Reader) (interface{}, error)

TapParseVBList parses a list of vBucket numbers as []uint16.

Types

type CommandCode

type CommandCode uint8

CommandCode for memcached packets.

func (CommandCode) IsQuiet

func (o CommandCode) IsQuiet() bool

IsQuiet will return true if a command is a "quiet" command.

func (CommandCode) String

func (o CommandCode) String() (rv string)

String an op code.

type DurabilityLvl

type DurabilityLvl uint8
const (
	DuraInvalid                    DurabilityLvl = iota // Not used (0x0)
	DuraMajority                   DurabilityLvl = iota // (0x01)
	DuraMajorityAndPersistOnMaster DurabilityLvl = iota // (0x02)
	DuraPersistToMajority          DurabilityLvl = iota // (0x03)
)

type FrameInfo

type FrameInfo struct {
	ObjId   FrameObjType
	ObjLen  int
	ObjData []byte
}

func (*FrameInfo) Bytes

func (f *FrameInfo) Bytes() ([]byte, bool)

func (*FrameInfo) GetDurabilityRequirements

func (f *FrameInfo) GetDurabilityRequirements() (lvl DurabilityLvl, timeoutProvided bool, timeoutMs uint16, err error)

func (*FrameInfo) GetStreamId

func (f *FrameInfo) GetStreamId() (uint16, error)

func (*FrameInfo) Validate

func (f *FrameInfo) Validate() error

type FrameObjType

type FrameObjType int
const (
	FrameBarrier        FrameObjType = iota // 0
	FrameDurability     FrameObjType = iota // 1
	FrameDcpStreamId    FrameObjType = iota // 2
	FrameOpenTracing    FrameObjType = iota // 3
	FrameImpersonate    FrameObjType = iota // 4
	FramePreserveExpiry FrameObjType = iota // 5
)

type MCItem

type MCItem struct {
	Cas               uint64
	Flags, Expiration uint32
	Data              []byte
}

MCItem is an internal representation of an item.

type MCRequest

type MCRequest struct {
	// The command being issued
	Opcode CommandCode
	// The CAS (if applicable, or 0)
	Cas uint64
	// An opaque value to be returned with this request
	Opaque uint32
	// The vbucket to which this command belongs
	VBucket uint16
	// Command extras, key, and body
	Extras, Key, Body, ExtMeta []byte
	// Datatype identifier
	DataType uint8
	// len() calls are expensive - cache this in case for collection
	Keylen int
	// Collection id for collection based operations
	CollId [binary.MaxVarintLen32]byte
	// Length of collection id
	CollIdLen int
	// Impersonate user name - could go in FramingExtras, but for efficiency
	Username [MAX_USER_LEN]byte
	// Length of Impersonate user name
	UserLen int
	// Flexible Framing Extras
	FramingExtras []FrameInfo
	// Stored length of incoming framing extras
	FramingElen int
}

MCRequest is memcached Request

func (*MCRequest) Bytes

func (req *MCRequest) Bytes() []byte

Bytes will return the wire representation of this request.

func (*MCRequest) FillHeaderBytes

func (req *MCRequest) FillHeaderBytes(data []byte) (int, bool)

func (*MCRequest) HdrSize

func (req *MCRequest) HdrSize() int

Size gives the number of bytes this request requires.

func (*MCRequest) HeaderBytes

func (req *MCRequest) HeaderBytes() []byte

HeaderBytes will return the wire representation of the request header (with the extras and key).

func (*MCRequest) ParseTapCommands

func (req *MCRequest) ParseTapCommands() (TapConnect, error)

ParseTapCommands parse the tap request into the interesting bits we may need to do something with.

func (*MCRequest) Receive

func (req *MCRequest) Receive(r io.Reader, hdrBytes []byte) (int, error)

Receive will fill this MCRequest with the data from a reader.

func (*MCRequest) Size

func (req *MCRequest) Size() int

func (MCRequest) String

func (req MCRequest) String() string

A debugging string representation of this request

func (*MCRequest) Transmit

func (req *MCRequest) Transmit(w io.Writer) (n int, err error)

Transmit will send this request message across a writer.

type MCResponse

type MCResponse struct {
	// The command opcode of the command that sent the request
	Opcode CommandCode
	// The status of the response
	Status Status
	// The opaque sent in the request
	Opaque uint32
	// The CAS identifier (if applicable)
	Cas uint64
	// Extras, key, and body for this response
	Extras, Key, Body []byte
	// If true, this represents a fatal condition and we should hang up
	Fatal bool
	// Datatype identifier
	DataType uint8
}

MCResponse is memcached response

func (*MCResponse) Bytes

func (res *MCResponse) Bytes() []byte

Bytes will return the actual bytes transmitted for this response.

func (*MCResponse) Error

func (res *MCResponse) Error() string

Response as an error.

func (*MCResponse) HeaderBytes

func (res *MCResponse) HeaderBytes() []byte

HeaderBytes will get just the header bytes for this response.

func (*MCResponse) Receive

func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error)

Receive will fill this MCResponse with the data from this reader.

func (*MCResponse) ReceiveWithBuf

func (res *MCResponse) ReceiveWithBuf(r io.Reader, hdrBytes, buf []byte) (n int, err error)

ReceiveWithBuf takes an optional pre-allocated []byte buf which will be used if its capacity is large enough, otherwise a new []byte slice is allocated.

func (*MCResponse) Size

func (res *MCResponse) Size() int

Size is number of bytes this response consumes on the wire.

func (MCResponse) String

func (res MCResponse) String() string

A debugging string representation of this response

func (*MCResponse) Transmit

func (res *MCResponse) Transmit(w io.Writer) (n int, err error)

Transmit will send this response message across a writer.

type MCResponsePool

type MCResponsePool struct {
	// contains filtered or unexported fields
}

func NewMCResponsePool

func NewMCResponsePool() *MCResponsePool

func (*MCResponsePool) Get

func (this *MCResponsePool) Get() *MCResponse

func (*MCResponsePool) Put

func (this *MCResponsePool) Put(r *MCResponse)

type Status

type Status uint16

Status field for memcached response.

func (Status) String

func (s Status) String() (rv string)

String an op code.

type StringMCResponsePool

type StringMCResponsePool struct {
	// contains filtered or unexported fields
}

func NewStringMCResponsePool

func NewStringMCResponsePool(size int) *StringMCResponsePool

func (*StringMCResponsePool) Get

func (this *StringMCResponsePool) Get() map[string]*MCResponse

func (*StringMCResponsePool) Put

func (this *StringMCResponsePool) Put(m map[string]*MCResponse)

type TapConnect

type TapConnect struct {
	Flags         map[TapConnectFlag]interface{}
	RemainingBody []byte
	Name          string
}

type TapConnectFlag

type TapConnectFlag uint32

func (TapConnectFlag) SplitFlags

func (f TapConnectFlag) SplitFlags() []TapConnectFlag

SplitFlags will split the ORed flags into the individual bit flags.

func (TapConnectFlag) String

func (f TapConnectFlag) String() string

type TapItemParser

type TapItemParser func(io.Reader) (interface{}, error)

TapItemParser is a function to parse a single tap extra.

Directories

Path Synopsis
Package memcached provides a memcached binary protocol client.
Package memcached provides a memcached binary protocol client.
Package mcdebug provides memcached client op statistics via expvar.
Package mcdebug provides memcached client op statistics via expvar.
Package memcached provides useful functions for building your own memcached server.
Package memcached provides useful functions for building your own memcached server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL