Version: v0.0.0-...-8223eb1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2020 License: Apache-2.0 Imports: 25 Imported by: 0



package consulutil contains common routines for setting up a live Consul server for use in unit tests. The server runs within the test process and uses an isolated in-memory data store. This functionality is not currently recommended for use due to data / races present in the package.



This section is empty.


View Source
var CanceledError = errors.New("Consul operation canceled")

CanceledError signifies that the Consul operation was explicitly canceled.

View Source
var SessionMaxRetrySeconds = param.Int("session_max_retry_seconds", 300)

SessionMaxRetrySeconds is the maximum number of seconds to wait between failed attempts to acquire a session.

View Source
var SessionRetrySeconds = param.Int("session_retry_seconds", 10)

SessionRetrySeconds specifies the base time to wait between retries when establishing a session. In the presence of errors, the effective time is derived from this base following a strategy of exponential backoff with jitter.


func Get

func Get(
	ctx context.Context,
	clientKV ConsulGetter,
	key string,
	options *api.QueryOptions,
) (*api.KVPair, *api.QueryMeta, error)

Like List, but for a single key instead of a list.

func List

func List(
	clientKV ConsulLister,
	done <-chan struct{},
	prefix string,
	options *api.QueryOptions,
) (api.KVPairs, *api.QueryMeta, error)

List performs a KV List operation that can be canceled. When the "done" channel is closed, CanceledError will be immediately returned. (The HTTP RPC can't be canceled, but it will be ignored.) Errors from Consul will be wrapped in a KVError value.

func SafeKeys

func SafeKeys(
	clientKV ConsulKeyser,
	done <-chan struct{},
	prefix string,
	options *api.QueryOptions,
) ([]string, *api.QueryMeta, error)

SafeKeys performs a KV Keys operation that can be canceled. When the "done" channel is closed, CanceledError will be immediately returned. (The HTTP RPC can't be canceled, but it will be ignored.) Errors from Consul will be wrapped in a KVError value.

func SessionManager

func SessionManager(
	config api.SessionEntry,
	client ConsulClient,
	output chan<- string,
	done chan struct{},
	logger logging.Logger,

SessionManager continually creates and maintains Consul sessions. It is intended to be run in its own goroutine. If one session expires, a new one will be created. As sessions come and go, the session ID (or "" for an expired session) will be sent on the output channel.


config:  Configuration passed to Consul when creating a new session.
client:  The Consul client to use.
output:  The channel used for exposing Consul session IDs. This method takes
         ownership of this channel and will close it once no new IDs will be created.
done:    Close this channel to close the current session (if any) and stop creating
         new sessions.
logger:  Errors will be logged to this logger.

func WatchDiff

func WatchDiff(
	prefix string,
	clientKV ConsulLister,
	quitCh <-chan struct{},
) (<-chan *WatchedChanges, <-chan error)

WatchDiff watches a Consul prefix for changes and categorizes them into create, update, and delete, please note that if a kvPair was create and modified before this starts watching, this watch will treat it as a create

func WatchKeys

func WatchKeys(
	prefix string,
	clientKV ConsulKeyser,
	done <-chan struct{},
	pause time.Duration,
) chan WatchedKeys

WatchKeys executes consul keys queries on a particular prefix and passes the set of keys on an output channel each time the query returns

func WatchNewKeys

func WatchNewKeys(pairsChan <-chan api.KVPairs, onNewKey NewKeyHandler, done <-chan struct{})

WatchNewKeys watches for changes to a list of Key/Value pairs and lets each key be handled individually though a subscription-like interface.

This function models a key's lifetime in the following way. When a key is first seen, the given NewKeyHandler function will be run, which may return a channel. When the key's value changes, new K/V updates are sent to the key's notification channel. When the key is deleted, `nil` is sent. After being deleted or if the watcher is asked to exit, a key's channel will be closed, to notify the receiver that no further updates are coming.

WatchNewKeys doesn't watch a prefix itself--the caller should arrange a suitable input stream of K/V pairs, probably from WatchPrefix(). This function runs until the input stream closes. Closing "done" will asynchronously cancel the watch and cause it to eventually exit.

func WatchPrefix

func WatchPrefix(
	prefix string,
	clientKV ConsulLister,
	outPairs chan<- api.KVPairs,
	done <-chan struct{},
	outErrors chan<- error,
	pause time.Duration,
	jitterWindow time.Duration,

WatchPrefix watches a Consul prefix for changes to any keys that have the prefix. When anything changes, all Key/Value pairs having that prefix will be written to the provided channel.

Errors will sent on the given output channel but do not otherwise affect execution. The given output stream will become owned by this function call, and this call will close it when the function ends. This function will run until explicitly canceled by closing the "done" channel. Data is written to the output channel synchronously, so readers must consume the data or this method will block.

jitterWindow is used to add randomized jitter between consul requests which can help mitigate a flood of requests to a server when it because available after a period of unavailability. A sleep time will be chosen between 0 and the jitterWindow setting with a uniform probability over that range

func WatchSingle

func WatchSingle(
	key string,
	clientKV ConsulGetter,
	outKVP chan<- *api.KVPair,
	done <-chan struct{},
	outErrors chan<- error,

WatchSingle has the same semantics as WatchPrefix, but for a single key in Consul. If the key is deleted, a nil will be sent on the output channel, but the watch will not be terminated. In addition, if updates happen in rapid succession, intervening updates may be missed. If these semantics are undesirable, consider WatchNewKeys instead.

func WithSession

func WithSession(
	done <-chan struct{},
	sessions <-chan string,
	f func(done <-chan struct{}, session string),

WithSession executes the function f when there is an active session. When that session ends, f is signaled to exit. Once f finishes, a new execution will start when a new session begins.

This function runs until the input stream of sessions is closed. Closing the "done" argument provides a shortcut to exit this function when also tearing down the session producer. In either case, any running f will be terminated before returning. A panic in f will also propagate upwards, causing the function to exit.


type ConsulClient

type ConsulClient interface {
	KV() ConsulKVClient
	Session() ConsulSessionClient

Wrapper interface that allows retrieval of the underlying interfaces.

func ConsulClientFromRaw

func ConsulClientFromRaw(client *api.Client) ConsulClient

Sadly, *api.Client does not implement the ConsulClient interface because the return types of KV() and Session() don't match exactly, e.g. KV() returns an *api.KV not a ConsulKVCLient, even though *api.KV implements ConsulKVClient. This function wraps an *api.Client into something that implements ConsulClient.

type ConsulGetter

type ConsulGetter interface {
	Get(key string, opts *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error)

type ConsulKVClient

type ConsulKVClient interface {
	Acquire(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
	CAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
	Delete(key string, w *api.WriteOptions) (*api.WriteMeta, error)
	DeleteCAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
	DeleteTree(prefix string, w *api.WriteOptions) (*api.WriteMeta, error)
	Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error)
	Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error)
	List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)
	Put(pair *api.KVPair, w *api.WriteOptions) (*api.WriteMeta, error)
	Release(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
	Txn(txn api.KVTxnOps, q *api.QueryOptions) (bool, *api.KVTxnResponse, *api.QueryMeta, error)

Interface representing the functionality of the api.KV struct returned by calling KV() on an *api.Client. This is useful for swapping in KV implementations for tests for example

type ConsulKeyser

type ConsulKeyser interface {
	Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error)

type ConsulLister

type ConsulLister interface {
	List(prefix string, opts *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)

ConsulLister is a portion of the interface for api.KV

type ConsulSessionClient

type ConsulSessionClient interface {
	Create(se *api.SessionEntry, q *api.WriteOptions) (string, *api.WriteMeta, error)
	CreateNoChecks(*api.SessionEntry, *api.WriteOptions) (string, *api.WriteMeta, error)
	Destroy(string, *api.WriteOptions) (*api.WriteMeta, error)
	Info(id string, q *api.QueryOptions) (*api.SessionEntry, *api.QueryMeta, error)
	List(q *api.QueryOptions) ([]*api.SessionEntry, *api.QueryMeta, error)
	Renew(id string, q *api.WriteOptions) (*api.SessionEntry, *api.WriteMeta, error)
	RenewPeriodic(initialTTL string, id string, q *api.WriteOptions, doneCh chan struct{}) error

Specifies the functionality provided by the *api.Session struct for managing consul sessions. This is useful for swapping in session client implementations in tests

type FakeConsulClient

type FakeConsulClient struct {
	KV_ ConsulKVClient

func NewFakeClient

func NewFakeClient() *FakeConsulClient

func (FakeConsulClient) KV

func (FakeConsulClient) Session

type FakeKV

type FakeKV struct {
	Entries map[string]*api.KVPair
	// contains filtered or unexported fields

Provides a fake implementation of *api.KV{} which is useful in tests

func NewKVWithEntries

func NewKVWithEntries(entries map[string]*api.KVPair) *FakeKV

func (*FakeKV) Acquire

func (f *FakeKV) Acquire(pair *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)

The fake implementation of this is just the same as writing a key, we expect callers to be using the /lock subtree so real keys won't ever appear like locks

func (*FakeKV) CAS

func (f *FakeKV) CAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)

func (*FakeKV) Delete

func (f *FakeKV) Delete(key string, w *api.WriteOptions) (*api.WriteMeta, error)

func (*FakeKV) DeleteCAS

func (f *FakeKV) DeleteCAS(pair *api.KVPair, opts *api.WriteOptions) (bool, *api.WriteMeta, error)

func (*FakeKV) DeleteTree

func (f *FakeKV) DeleteTree(prefix string, w *api.WriteOptions) (*api.WriteMeta, error)

func (*FakeKV) Get

func (f *FakeKV) Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error)

func (*FakeKV) Keys

func (f *FakeKV) Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error)

func (*FakeKV) List

func (f *FakeKV) List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)

func (*FakeKV) Put

func (f *FakeKV) Put(pair *api.KVPair, q *api.WriteOptions) (*api.WriteMeta, error)

func (*FakeKV) Release

func (f *FakeKV) Release(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)

func (*FakeKV) Txn

type Fixture

type Fixture struct {
	Agent    *agent.Agent
	Servers  []*agent.HTTPServer
	Client   ConsulClient
	T        *testing.T
	HTTPPort int

Fixture sets up a test Consul server and provides the client configuration for accessing it.

func NewFixture

func NewFixture(t *testing.T) Fixture

NewFixture creates a new testing instance of Consul.

func (Fixture) GetKV

func (f Fixture) GetKV(key string) []byte

func (Fixture) SetKV

func (f Fixture) SetKV(key string, val []byte)

func (Fixture) Stop

func (f Fixture) Stop()

Stop will stop the Consul test server and deallocate any other resources in the testing fixture. It should always be called at the end of a unit test.

type KVError

type KVError struct {
	Op      string
	Key     string
	KVError error
	// contains filtered or unexported fields

KVError encapsulates a consul error

func NewKVError

func NewKVError(op string, key string, err error) KVError

NewKVError constructs a new KVError to wrap errors from Consul.

func (KVError) Error

func (err KVError) Error() string

Error implements the error and "pkg/util".CallsiteError interfaces.

func (KVError) Filename

func (err KVError) Filename() string

Filename implements the "pkg/util".CallsiteError interface.

func (KVError) Function

func (err KVError) Function() string

Function implements the "pkg/util".CallsiteError interface.

func (KVError) LineNumber

func (err KVError) LineNumber() int

LineNumber implements the "pkg/util".CallsiteError interface.

type NewKeyHandler

type NewKeyHandler func(key string) chan<- *api.KVPair

type WatchedChanges

type WatchedChanges struct {
	Created api.KVPairs
	Updated api.KVPairs
	Deleted api.KVPairs
	Same    api.KVPairs

type WatchedKeys

type WatchedKeys struct {
	Keys []string
	Err  error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL