package module
v0.0.5 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2018 License: Apache-2.0 Imports: 19 Imported by: 0



Build Status

A ZooKeeper "personality" for etcd. Point a ZooKeeper client at zetcd to dispatch the operations on an etcd cluster.

Protocol encoding and decoding heavily based on go-zookeeper.

Getting started

Running zetcd

Forward ZooKeeper requests on :2181 to an etcd server listening on localhost:2379:

go get github.com/coreos/zetcd/cmd/zetcd
zetcd --zkaddr --endpoints localhost:2379

Simple testing with zkctl:

go get github.com/coreos/zetcd/cmd/zkctl
zkctl watch / &
zkctl create /abc "foo"
Running zetcd on Docker

Official docker images of tagged zetcd releases for containerized environments are hosted at quay.io/coreos/zetcd. Use docker run to launch the zetcd container with the same configuration as the go get example:

docker run --net host -t quay.io/coreos/zetcd -endpoints localhost:2379

In cross-checking mode, zetcd dynamically tests a fresh isolated "candidate" zetcd cluster against a fresh isolated ZooKeeper "oracle" cluster for divergences. This mode dispatches requests to both zetcd and ZooKeeper, then compares the responses to check for equivalence. If the responses disagree, it is flagged in the logs. Use the flags -zkbridge to configure a ZooKeeper endpoint and -oracle zk to enable checking.

Cross-check zetcd's ZooKeeper emulation with a native ZooKeeper server endpoint at localhost:2182 like so:

zetcd --zkaddr --endpoints localhost:2379 --debug-zkbridge localhost:2182  --debug-oracle zk --logtostderr -v 9



See CONTRIBUTING for details on submitting patches and the contribution workflow.


zetcd is under the Apache 2.0 license. See the LICENSE file for details.




View Source
const (
	FlagEphemeral = 1
	FlagSequence  = 2
View Source
const (
	StateUnknown           = State(-1)
	StateDisconnected      = State(0)
	StateConnecting        = State(1)
	StateSyncConnected     = State(3)
	StateAuthFailed        = State(4)
	StateConnectedReadOnly = State(5)
	StateSaslAuthenticated = State(6)
	StateExpired           = State(-112)

	StateConnected  = State(100)
	StateHasSession = State(101)
View Source
const (
	DefaultPort = 2181


View Source
var (
	ErrConnectionClosed        = errors.New("zk: connection closed")
	ErrUnknown                 = errors.New("zk: unknown error")
	ErrAPIError                = errors.New("zk: api error")
	ErrNoNode                  = errors.New("zk: node does not exist")
	ErrNoAuth                  = errors.New("zk: not authenticated")
	ErrBadVersion              = errors.New("zk: version conflict")
	ErrNoChildrenForEphemerals = errors.New("zk: ephemeral nodes may not have children")
	ErrNodeExists              = errors.New("zk: node already exists")
	ErrNotEmpty                = errors.New("zk: node has children")
	ErrSessionExpired          = errors.New("zk: session has been expired by the server")
	ErrInvalidACL              = errors.New("zk: invalid ACL specified")
	ErrAuthFailed              = errors.New("zk: client authentication failed")
	ErrClosing                 = errors.New("zk: zookeeper is closing")
	ErrNothing                 = errors.New("zk: no server responsees to process")
	ErrSessionMoved            = errors.New("zk: session moved to another server, so operation is ignored")

	ErrBadArguments = errors.New("zk: bad arguments")
View Source
var (
	ErrShortBuffer        = errors.New("short buffer")
	ErrPtrExpected        = errors.New("ptr expected")
	ErrUnhandledFieldType = errors.New("unhandled field type")
View Source
var PerfectZXidMode bool = true

PerfectZXid is enabled to insert err writes to match zookeeper's zxids


func ReadPacket

func ReadPacket(zk net.Conn, r interface{}) (string, error)

func Serve

func Serve(ctx context.Context, ln net.Listener, auth AuthFunc, zk ZKFunc)

Serve will serve multiple sessions in concurrently.

func ServeSerial

func ServeSerial(ctx context.Context, ln net.Listener, auth AuthFunc, zk ZKFunc)

ServeSerial has at most one inflight request at a time so two servers can be reliably cross checked.

func WritePacket

func WritePacket(zk net.Conn, r interface{}) error


type ACL

type ACL struct {
	Perms  int32
	Scheme string
	ID     string

type AuthConn

type AuthConn interface {
	Read() (*AuthRequest, error)
	Write(AuthResponse) (Conn, error)

AuthConn transfers zookeeper handshaking for establishing a session

func NewAuthConn

func NewAuthConn(c net.Conn) AuthConn

type AuthFunc

type AuthFunc func(AuthConn) (Session, error)

func NewAuth

func NewAuth(c *etcd.Client) AuthFunc

type AuthRequest

type AuthRequest struct {
	Req            *ConnectRequest
	FourLetterWord string

type AuthResponse

type AuthResponse struct {
	Resp           *ConnectResponse
	FourLetterWord string

type CheckVersionRequest

type CheckVersionRequest pathVersionRequest

type Client

type Client interface {
	Send(xid Xid, req interface{}) error
	Read() <-chan ZKResponse
	StopNotify() <-chan struct{}

func NewClient

func NewClient(ctx context.Context, zk net.Conn) Client

type CloseRequest

type CloseRequest struct{}

type CloseResponse

type CloseResponse struct{}

type Conn

type Conn interface {
	Send(xid Xid, zxid ZXid, resp interface{}) error
	Read() <-chan ZKRequest
	StopNotify() <-chan struct{}

func NewConn

func NewConn(zk net.Conn) Conn

type ConnectRequest

type ConnectRequest struct {
	ProtocolVersion int32
	LastZxidSeen    ZXid
	TimeOut         int32
	SessionID       Sid
	Passwd          []byte

type ConnectResponse

type ConnectResponse struct {
	ProtocolVersion int32
	TimeOut         int32
	SessionID       Sid
	Passwd          []byte

type CreateRequest

type CreateRequest struct {
	Path  string
	Data  []byte
	Acl   []ACL
	Flags int32

type CreateResponse

type CreateResponse pathResponse

type DeleteRequest

type DeleteRequest pathVersionRequest

type DeleteResponse

type DeleteResponse struct{}

type ErrCode

type ErrCode int32

type EventType

type EventType int32
const (
	EventNodeCreated EventType = iota + 1

	EventSession     = EventType(-1)
	EventNotWatching = EventType(-2)

type ExistsRequest

type ExistsRequest pathWatchRequest

type ExistsResponse

type ExistsResponse statResponse

type GetAclRequest

type GetAclRequest pathRequest

type GetAclResponse

type GetAclResponse struct {
	Acl  []ACL
	Stat Stat

type GetChildren2Request

type GetChildren2Request pathWatchRequest

type GetChildren2Response

type GetChildren2Response struct {
	Children []string
	Stat     Stat

type GetChildrenRequest

type GetChildrenRequest pathWatchRequest

type GetChildrenResponse

type GetChildrenResponse struct {
	Children []string

type GetDataRequest

type GetDataRequest pathWatchRequest

type GetDataResponse

type GetDataResponse struct {
	Data []byte
	Stat Stat

type MultiHeader

type MultiHeader struct {
	Type Op
	Done bool
	Err  ErrCode

type MultiRequest

type MultiRequest struct {
	Ops        []MultiRequestOp
	DoneHeader MultiHeader

func (*MultiRequest) Decode

func (r *MultiRequest) Decode(buf []byte) (int, error)

func (*MultiRequest) Encode

func (r *MultiRequest) Encode(buf []byte) (int, error)

type MultiRequestOp

type MultiRequestOp struct {
	Header MultiHeader
	Op     interface{}

type MultiResponse

type MultiResponse struct {
	Ops        []MultiResponseOp
	DoneHeader MultiHeader

func (*MultiResponse) Decode

func (r *MultiResponse) Decode(buf []byte) (int, error)

func (*MultiResponse) Encode added in v0.0.2

func (r *MultiResponse) Encode(buf []byte) (int, error)

type MultiResponseOp

type MultiResponseOp struct {
	Header MultiHeader
	String string
	Stat   *Stat

type Op

type Op int32

type PingRequest

type PingRequest struct{}

type PingResponse

type PingResponse struct{}

type ResponseHeader

type ResponseHeader struct {
	Xid  Xid
	Zxid ZXid
	Err  ErrCode

type Session

type Session interface {
	Sid() Sid
	ZXid() ZXid
	ConnReq() ConnectRequest
	Backing() interface{}

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields

func NewSessionPool

func NewSessionPool(client *etcd.Client) *SessionPool

func (*SessionPool) Auth

func (sp *SessionPool) Auth(zka AuthConn) (Session, error)

type SetAclRequest

type SetAclRequest struct {
	Path    string
	Acl     []ACL
	Version Ver

type SetAclResponse

type SetAclResponse statResponse

type SetAuthRequest

type SetAuthRequest auth

type SetAuthResponse

type SetAuthResponse struct{}

type SetDataRequest

type SetDataRequest struct {
	Path    string
	Data    []byte
	Version Ver

type SetDataResponse

type SetDataResponse statResponse

type SetWatchesRequest

type SetWatchesRequest struct {
	RelativeZxid ZXid
	DataWatches  []string
	ExistWatches []string
	ChildWatches []string

type SetWatchesResponse

type SetWatchesResponse struct{}

type Sid

type Sid int64

type Stat

type Stat struct {
	// Czxid is the zxid change that caused this znode to be created.
	Czxid ZXid
	// Mzxid is The zxid change that last modified this znode.
	Mzxid ZXid
	// Ctime is milliseconds from epoch when this znode was created.
	Ctime int64
	// Mtime is The time in milliseconds from epoch when this znode was last modified.
	Mtime          int64
	Version        Ver   // The number of changes to the data of this znode.
	Cversion       Ver   // The number of changes to the children of this znode.
	Aversion       Ver   // The number of changes to the ACL of this znode.
	EphemeralOwner Sid   // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
	DataLength     int32 // The length of the data field of this znode.
	NumChildren    int32 // The number of children of this znode.
	Pzxid          ZXid  // last modified children

type State

type State int32

type SyncRequest

type SyncRequest pathRequest

type SyncResponse

type SyncResponse pathResponse

type Ver

type Ver int32 // version

type WatchHandler added in v0.0.3

type WatchHandler func(ZXid, EventType)

type WatcherEvent

type WatcherEvent struct {
	Type  EventType
	State State
	Path  string

type Watches

type Watches interface {
	// Watch creates a watch request on a given path and evtype.
	Watch(rev ZXid, xid Xid, path string, evtype EventType, cb WatchHandler)

	// Wait blocks until all watches that rely on the given rev are dispatched.
	Wait(rev ZXid, path string, evtype EventType)

type Xid

type Xid int32

type ZK

type ZK interface {
	Create(xid Xid, op *CreateRequest) ZKResponse
	Delete(xid Xid, op *DeleteRequest) ZKResponse
	Exists(xid Xid, op *ExistsRequest) ZKResponse
	GetData(xid Xid, op *GetDataRequest) ZKResponse
	SetData(xid Xid, op *SetDataRequest) ZKResponse
	GetAcl(xid Xid, op *GetAclRequest) ZKResponse
	SetAcl(xid Xid, op *SetAclRequest) ZKResponse
	GetChildren(xid Xid, op *GetChildrenRequest) ZKResponse
	Sync(xid Xid, op *SyncRequest) ZKResponse
	Ping(xid Xid, op *PingRequest) ZKResponse
	GetChildren2(xid Xid, op *GetChildren2Request) ZKResponse
	// opCheck		= 13
	Multi(xid Xid, op *MultiRequest) ZKResponse
	Close(xid Xid, op *CloseRequest) ZKResponse
	SetAuth(xid Xid, op *SetAuthRequest) ZKResponse
	SetWatches(xid Xid, op *SetWatchesRequest) ZKResponse

ZK is a synchronous interface

func NewZKEtcd

func NewZKEtcd(c *etcd.Client, s Session) ZK

func NewZKLog

func NewZKLog(zk ZK) ZK

type ZKFunc

type ZKFunc func(Session) (ZK, error)

func NewZK

func NewZK(c *etcd.Client) ZKFunc

type ZKRequest

type ZKRequest struct {
	// contains filtered or unexported fields

func (*ZKRequest) String

func (zk *ZKRequest) String() string

type ZKResponse

type ZKResponse struct {
	Hdr  *ResponseHeader
	Resp interface{}

	// Err is from transmission errors, etc
	Err error

func DispatchZK

func DispatchZK(zk ZK, xid Xid, op interface{}) ZKResponse

type ZXid

type ZXid int64


Path Synopsis
Package integration is used for testing zetcd against a full etcd server.
Package integration is used for testing zetcd against a full etcd server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL