store

package
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2016 License: Apache-2.0 Imports: 12 Imported by: 0

README

Storage

The goal of pkg/store is to abstract common store operations for multiple Key/Value backends.

For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.

As of now, pkg/store offers support for Consul, Etcd and Zookeeper.

Example of usage

Create a new store and use Put/Get
package main

import (
	"fmt"
	"time"

	log "github.com/Sirupsen/logrus"
	"github.com/docker/swarm/store"
)

func main() {
	var (
		// Consul local address
		client = "localhost:8500"
	)

	// Initialize a new store with consul
	kv, err = store.NewStore(
		store.CONSUL, // or "consul"
		[]string{client},
		&store.Config{
			Timeout: 10*time.Second,
		},
	)
	if err != nil {
		log.Error("Cannot create store consul")
	}

	key := "foo"
	err = kv.Put(key, []byte("bar"), nil)
	if err != nil {
		log.Error("Error trying to put value at key `", key, "`")
	}

	pair, err := kv.Get(key)
	if err != nil {
		log.Error("Error trying accessing value at key `", key, "`")
	}

	log.Info("value: ", string(pair.Value))
}

Contributing to a new storage backend

A new storage backend should include those calls:

type Store interface {
	Put(key string, value []byte, options *WriteOptions) error
	Get(key string) (*KVPair, error)
	Delete(key string) error
	Exists(key string) (bool, error)
	Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
	WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
	NewLock(key string, options *LockOptions) (Locker, error)
	List(prefix string) ([]*KVPair, error)
	DeleteTree(prefix string) error
	AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
	AtomicDelete(key string, previous *KVPair) (bool, error)
}

In the case of Swarm and to be eligible as a discovery backend only, a K/V store implementation should at least offer Get, Put, WatchTree and List.

Put should support usage of ttl to be able to remove entries in case of a node failure.

You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend.

Documentation

Index

Constants

View Source
const (
	// MOCK backend
	MOCK Backend = "mock"
	// CONSUL backend
	CONSUL = "consul"
	// ETCD backend
	ETCD = "etcd"
	// ZK backend
	ZK = "zk"
)
View Source
const (
	// DefaultWatchWaitTime is how long we block for at a time to check if the
	// watched key has changed.  This affects the minimum time it takes to
	// cancel a watch.
	DefaultWatchWaitTime = 15 * time.Second
)

Variables

View Source
var (
	// ErrInvalidTTL is a specific error to consul
	ErrInvalidTTL = errors.New("Invalid TTL, please change the value to the miminum allowed ttl for the chosen store")
	// ErrNotSupported is exported
	ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
	// ErrNotImplemented is exported
	ErrNotImplemented = errors.New("Call not implemented in current backend")
	// ErrNotReachable is exported
	ErrNotReachable = errors.New("Api not reachable")
	// ErrCannotLock is exported
	ErrCannotLock = errors.New("Error acquiring the lock")
	// ErrWatchDoesNotExist is exported
	ErrWatchDoesNotExist = errors.New("No watch found for specified key")
	// ErrKeyModified is exported
	ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
	// ErrKeyNotFound is exported
	ErrKeyNotFound = errors.New("Key not found in store")
	// ErrPreviousNotSpecified is exported
	ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
)

Functions

This section is empty.

Types

type Backend

type Backend string

Backend represents a KV Store Backend

type Config

type Config struct {
	TLS               *tls.Config
	ConnectionTimeout time.Duration
	EphemeralTTL      time.Duration
}

Config contains the options for a storage client

type Consul

type Consul struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Consul embeds the client and watches

func (*Consul) AtomicDelete

func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Consul) AtomicPut

func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Consul) Close

func (s *Consul) Close()

Close closes the client connection

func (*Consul) Delete

func (s *Consul) Delete(key string) error

Delete a value at "key"

func (*Consul) DeleteTree

func (s *Consul) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Consul) Exists

func (s *Consul) Exists(key string) (bool, error)

Exists checks that the key exists inside the store

func (*Consul) Get

func (s *Consul) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Consul) List

func (s *Consul) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Consul) NewLock

func (s *Consul) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Consul) Put

func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Consul) Watch

func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Consul) WatchTree

func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

type Etcd

type Etcd struct {
	// contains filtered or unexported fields
}

Etcd embeds the client

func (*Etcd) AtomicDelete

func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Etcd) AtomicPut

func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Etcd) Close

func (s *Etcd) Close()

Close closes the client connection

func (*Etcd) Delete

func (s *Etcd) Delete(key string) error

Delete a value at "key"

func (*Etcd) DeleteTree

func (s *Etcd) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Etcd) Exists

func (s *Etcd) Exists(key string) (bool, error)

Exists checks if the key exists inside the store

func (*Etcd) Get

func (s *Etcd) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Etcd) List

func (s *Etcd) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Etcd) NewLock

func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Etcd) Put

func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Etcd) Watch

func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Etcd) WatchTree

func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

type Initialize

type Initialize func(addrs []string, options *Config) (Store, error)

Initialize creates a new Store object, initializing the client

type KVPair

type KVPair struct {
	Key       string
	Value     []byte
	LastIndex uint64
}

KVPair represents {Key, Value, Lastindex} tuple

type LockOptions

type LockOptions struct {
	Value []byte        // Optional, value to associate with the lock
	TTL   time.Duration // Optional, expiration ttl associated with the lock
}

LockOptions contains optional request parameters

type Locker

type Locker interface {
	Lock() (<-chan struct{}, error)
	Unlock() error
}

Locker provides locking mechanism on top of the store. Similar to `sync.Lock` except it may return errors.

type Mock

type Mock struct {
	mock.Mock

	// Endpoints passed to InitializeMock
	Endpoints []string
	// Options passed to InitializeMock
	Options *Config
}

Mock store. Mocks all Store functions using testify.Mock.

func (*Mock) AtomicDelete

func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete mock

func (*Mock) AtomicPut

func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, *KVPair, error)

AtomicPut mock

func (*Mock) Close

func (s *Mock) Close()

Close mock

func (*Mock) Delete

func (s *Mock) Delete(key string) error

Delete mock

func (*Mock) DeleteTree

func (s *Mock) DeleteTree(prefix string) error

DeleteTree mock

func (*Mock) Exists

func (s *Mock) Exists(key string) (bool, error)

Exists mock

func (*Mock) Get

func (s *Mock) Get(key string) (*KVPair, error)

Get mock

func (*Mock) List

func (s *Mock) List(prefix string) ([]*KVPair, error)

List mock

func (*Mock) NewLock

func (s *Mock) NewLock(key string, options *LockOptions) (Locker, error)

NewLock mock

func (*Mock) Put

func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error

Put mock

func (*Mock) Watch

func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch mock

func (*Mock) WatchTree

func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree mock

type MockLock

type MockLock struct {
	mock.Mock
}

MockLock mock implementation of Locker

func (*MockLock) Lock

func (l *MockLock) Lock() (<-chan struct{}, error)

Lock mock

func (*MockLock) Unlock

func (l *MockLock) Unlock() error

Unlock mock

type Store

type Store interface {
	// Put a value at the specified key
	Put(key string, value []byte, options *WriteOptions) error

	// Get a value given its key
	Get(key string) (*KVPair, error)

	// Delete the value at the specified key
	Delete(key string) error

	// Verify if a Key exists in the store
	Exists(key string) (bool, error)

	// Watch changes on a key.
	// Returns a channel that will receive changes or an error.
	// Upon creating a watch, the current value will be sent to the channel.
	// Providing a non-nil stopCh can be used to stop watching.
	Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

	// WatchTree watches changes on a "directory"
	// Returns a channel that will receive changes or an error.
	// Upon creating a watch, the current value will be sent to the channel.
	// Providing a non-nil stopCh can be used to stop watching.
	WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

	// CreateLock for a given key.
	// The returned Locker is not held and must be acquired with `.Lock`.
	// value is optional.
	NewLock(key string, options *LockOptions) (Locker, error)

	// List the content of a given prefix
	List(prefix string) ([]*KVPair, error)

	// DeleteTree deletes a range of keys based on prefix
	DeleteTree(prefix string) error

	// Atomic operation on a single value
	AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

	// Atomic delete of a single value
	AtomicDelete(key string, previous *KVPair) (bool, error)

	// Close the store connection
	Close()
}

Store represents the backend K/V storage Each store should support every call listed here. Or it couldn't be implemented as a K/V backend for libkv

func InitializeConsul

func InitializeConsul(endpoints []string, options *Config) (Store, error)

InitializeConsul creates a new Consul client given a list of endpoints and optional tls config

func InitializeEtcd

func InitializeEtcd(addrs []string, options *Config) (Store, error)

InitializeEtcd creates a new Etcd client given a list of endpoints and optional tls config

func InitializeMock

func InitializeMock(endpoints []string, options *Config) (Store, error)

InitializeMock creates a Mock store.

func InitializeZookeeper

func InitializeZookeeper(endpoints []string, options *Config) (Store, error)

InitializeZookeeper creates a new Zookeeper client given a list of endpoints and optional tls config

func NewStore

func NewStore(backend Backend, addrs []string, options *Config) (Store, error)

NewStore creates a an instance of store

type WatchCallback

type WatchCallback func(entries ...*KVPair)

WatchCallback is used for watch methods on keys and is triggered on key change

type WriteOptions

type WriteOptions struct {
	Heartbeat time.Duration
	Ephemeral bool
}

WriteOptions contains optional request parameters

type Zookeeper

type Zookeeper struct {
	// contains filtered or unexported fields
}

Zookeeper embeds the zookeeper client

func (*Zookeeper) AtomicDelete

func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Zookeeper) AtomicPut

func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, _ *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Zookeeper) Close

func (s *Zookeeper) Close()

Close closes the client connection

func (*Zookeeper) Delete

func (s *Zookeeper) Delete(key string) error

Delete a value at "key"

func (*Zookeeper) DeleteTree

func (s *Zookeeper) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Zookeeper) Exists

func (s *Zookeeper) Exists(key string) (bool, error)

Exists checks if the key exists inside the store

func (*Zookeeper) Get

func (s *Zookeeper) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Zookeeper) List

func (s *Zookeeper) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Zookeeper) NewLock

func (s *Zookeeper) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Zookeeper) Put

func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Zookeeper) Watch

func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Zookeeper) WatchTree

func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL