xds

package
v1.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2022 License: Apache-2.0 Imports: 21 Imported by: 4

Documentation

Overview

Package xds is an implementation of Envoy's xDS (Discovery Service) protocol.

Server is the base implementation of any gRPC server which supports the xDS protocol. All xDS bi-directional gRPC streams from Stream* calls must be handled by calling Server.HandleRequestStream. For example, to implement the ADS protocol:

func (s *myGRPCServer) StreamAggregatedResources(stream api.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    return s.xdsServer.HandleRequestStream(stream.Context(), stream, xds.AnyTypeURL)
}

Server is parameterized by a map of supported resource type URLs to resource sets, e.g. to support the LDS and RDS protocols:

ldsCache := xds.NewCache()
lds := xds.NewAckingResourceMutatorWrapper(ldsCache)
rdsCache := xds.NewCache()
rds := xds.NewAckingResourceMutatorWrapper(rdsCache)

resTypes := map[string]xds.ResourceTypeConfiguration{
    "type.googleapis.com/envoy.config.listener.v3.Listener": {ldsCache, lds},
    "type.googleapis.com/envoy.config.route.v3.RouteConfiguration": {rdsCache, rds},
}

server := xds.NewServer(resTypes, 5*time.Seconds)

It is recommended to use a distinct resource set for each resource type to minimize the volume of messages sent and received by xDS clients.

Resource sets must implement the ResourceSource interface to provide read access to resources of one or multiple resource types:

type ResourceSource interface {
    GetResources(ctx context.Context, typeURL string, lastVersion *uint64,
        nodeIP string, resourceNames []string) (*VersionedResources, error)
}

Resource sets should implement the ResourceSet interface to provide read-write access. It provides an API to atomically update the resources in the set: Upsert inserts or updates a single resource in the set, and Delete deletes a single resource from the set.

Cache is an efficient, ready-to-use implementation of ResourceSet:

typeURL := "type.googleapis.com/envoy.config.listener.v3.Listener"
ldsCache := xds.NewCache()
ldsCache.Upsert(typeURL, "listener123", listenerA, false)
ldsCache.Delete(typeURL, "listener456", false)

In order to wait for acknowledgements of updates by Envoy nodes, each resource set should be wrapped into an AckingResourceMutatorWrapper, which should then be passed to NewServer(). AckingResourceMutatorWrapper provides an extended API which accepts Completions to notify of ACKs.

typeURL := "type.googleapis.com/envoy.config.listener.v3.Listener"
ldsCache := xds.NewCache()
lds := xds.NewAckingResourceMutatorWrapper(ldsCache)

ctx, cancel := context.WithTimeout(..., 5*time.Second)
wg := completion.NewWaitGroup(ctx)
nodes := []string{"10.0.0.1"} // Nodes to wait an ACK from.
lds.Upsert(typeURL, "listener123", listenerA, nodes, wg.AddCompletion())
lds.Delete(typeURL, "listener456", nodes, wg.AddCompletion())
wg.Wait()
cancel()

Index

Constants

View Source
const (
	// AnyTypeURL is the default type URL to use for ADS resource sets.
	AnyTypeURL = ""
)

Variables

View Source
var (
	// ErrNoADSTypeURL is the error returned when receiving a request without
	// a type URL from an ADS stream.
	ErrNoADSTypeURL = errors.New("type URL is required for ADS")

	// ErrUnknownTypeURL is the error returned when receiving a request with
	// an unknown type URL.
	ErrUnknownTypeURL = errors.New("unknown type URL")

	// ErrInvalidVersionInfo is the error returned when receiving a request
	// with a version info that is not a positive integer.
	ErrInvalidVersionInfo = errors.New("invalid version info")

	// ErrInvalidNonce is the error returned when receiving a request
	// with a response nonce that is not a positive integer.
	ErrInvalidResponseNonce = errors.New("invalid response nonce info")

	// ErrInvalidNodeFormat is the error returned when receiving a request
	// with a node that is not a formatted correctly.
	ErrInvalidNodeFormat = errors.New("invalid node format")

	// ErrResourceWatch is the error returned whenever an internal error
	// occurs while waiting for new versions of resources.
	ErrResourceWatch = errors.New("resource watch failed")
)
View Source
var (
	ErrNackReceived error = errors.New("NACK received")
)

Functions

func IstioNodeToIP

func IstioNodeToIP(nodeId string) (string, error)

IstioNodeToIP extract the IP address from an Envoy node identifier configured by Istio's pilot-agent.

Istio's pilot-agent structures the nodeId as the concatenation of the following parts separated by ~:

- node type: one of "sidecar", "ingress", or "router" - node IP address - node ID: the unique platform-specific sidecar proxy ID - node domain: the DNS domain suffix for short hostnames, e.g. "default.svc.cluster.local"

For instance:

"sidecar~10.1.1.0~v0.default~default.svc.cluster.local"

Types

type AckingResourceMutator

type AckingResourceMutator interface {
	// Upsert inserts or updates a resource from this set by name and increases
	// the set's version number atomically if the resource is actually inserted
	// or updated.
	// The completion is called back when the new upserted resources' version is
	// ACKed by the Envoy nodes which IDs are given in nodeIDs.
	// A call to the returned revert function reverts the effects of this
	// method call.
	Upsert(typeURL string, resourceName string, resource proto.Message, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc

	// UseCurrent inserts a completion that allows the caller to wait for the current
	// version of the given typeURL to be ACKed.
	UseCurrent(typeURL string, nodeIDs []string, wg *completion.WaitGroup)

	// DeleteNode frees resources held for the named node
	DeleteNode(nodeID string)

	// Delete deletes a resource from this set by name and increases the cache's
	// version number atomically if the resource is actually deleted.
	// The completion is called back when the new deleted resources' version is
	// ACKed by the Envoy nodes which IDs are given in nodeIDs.
	// A call to the returned revert function reverts the effects of this
	// method call.
	Delete(typeURL string, resourceName string, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc
}

AckingResourceMutator is a variant of ResourceMutator which calls back a Completion when a resource update is ACKed by a set of Envoy nodes.

type AckingResourceMutatorRevertFunc

type AckingResourceMutatorRevertFunc func(completion *completion.Completion)

AckingResourceMutatorRevertFunc is a function which reverts the effects of an update on a AckingResourceMutator. The completion is called back when the new resource update is ACKed by the Envoy nodes.

type AckingResourceMutatorWrapper

type AckingResourceMutatorWrapper struct {
	// contains filtered or unexported fields
}

AckingResourceMutatorWrapper is an AckingResourceMutator which wraps a ResourceMutator to notifies callers when resource updates are ACKed by nodes. AckingResourceMutatorWrapper also implements ResourceVersionAckObserver in order to be notified of ACKs from nodes.

func NewAckingResourceMutatorWrapper

func NewAckingResourceMutatorWrapper(mutator ResourceMutator) *AckingResourceMutatorWrapper

NewAckingResourceMutatorWrapper creates a new AckingResourceMutatorWrapper to wrap the given ResourceMutator.

func (*AckingResourceMutatorWrapper) Delete

func (m *AckingResourceMutatorWrapper) Delete(typeURL string, resourceName string, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc

func (*AckingResourceMutatorWrapper) DeleteNode

func (m *AckingResourceMutatorWrapper) DeleteNode(nodeID string)

DeleteNode frees resources held for the named nodes

func (*AckingResourceMutatorWrapper) HandleResourceVersionAck

func (m *AckingResourceMutatorWrapper) HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, typeURL string, detail string)

'ackVersion' is the last version that was acked. 'nackVersion', if greater than 'nackVersion', is the last version that was NACKed.

func (*AckingResourceMutatorWrapper) Upsert

func (m *AckingResourceMutatorWrapper) Upsert(typeURL string, resourceName string, resource proto.Message, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc

func (*AckingResourceMutatorWrapper) UseCurrent

func (m *AckingResourceMutatorWrapper) UseCurrent(typeURL string, nodeIDs []string, wg *completion.WaitGroup)

UseCurrent adds a completion to the WaitGroup if the current version of the cached resource has not been acked yet, allowing the caller to wait for the ACK.

type BaseObservableResourceSource

type BaseObservableResourceSource struct {
	// contains filtered or unexported fields
}

BaseObservableResourceSource implements the AddResourceVersionObserver and RemoveResourceVersionObserver methods to handle the notification of new resource versions. This is meant to be used as a base to implement ObservableResourceSource.

func NewBaseObservableResourceSource

func NewBaseObservableResourceSource() *BaseObservableResourceSource

NewBaseObservableResourceSource initializes the given set.

func (*BaseObservableResourceSource) AddResourceVersionObserver

func (s *BaseObservableResourceSource) AddResourceVersionObserver(observer ResourceVersionObserver)

AddResourceVersionObserver registers an observer to be notified of new resource version.

func (*BaseObservableResourceSource) NotifyNewResourceVersionRLocked

func (s *BaseObservableResourceSource) NotifyNewResourceVersionRLocked(typeURL string, version uint64)

NotifyNewResourceVersionRLocked notifies registered observers that a new version of the resources of the given type is available. This function MUST be called with locker's lock acquired.

func (*BaseObservableResourceSource) RemoveResourceVersionObserver

func (s *BaseObservableResourceSource) RemoveResourceVersionObserver(observer ResourceVersionObserver)

RemoveResourceVersionObserver unregisters an observer that was previously registered by calling AddResourceVersionObserver.

type Cache

type Cache struct {
	*BaseObservableResourceSource
	// contains filtered or unexported fields
}

Cache is a key-value container which allows atomically updating entries and incrementing a version number and notifying observers if the cache is actually modified. Cache implements the ObservableResourceSet interface. This cache implementation ignores the proxy node identifiers, i.e. the same resources are available under the same names to all nodes.

func NewCache

func NewCache() *Cache

NewCache creates a new, empty cache with 0 as its current version.

func (*Cache) Clear

func (c *Cache) Clear(typeURL string) (version uint64, updated bool)

func (*Cache) Delete

func (c *Cache) Delete(typeURL string, resourceName string) (version uint64, updated bool, revert ResourceMutatorRevertFunc)

func (*Cache) EnsureVersion

func (c *Cache) EnsureVersion(typeURL string, version uint64)

func (*Cache) GetResources

func (c *Cache) GetResources(ctx context.Context, typeURL string, lastVersion uint64, nodeIP string, resourceNames []string) (*VersionedResources, error)

func (*Cache) Lookup

func (c *Cache) Lookup(typeURL string, resourceName string) (proto.Message, error)

Lookup finds the resource corresponding to the specified typeURL and resourceName, if available, and returns it. Otherwise, returns nil. If an error occurs while fetching the resource, also returns the error.

func (*Cache) Upsert

func (c *Cache) Upsert(typeURL string, resourceName string, resource proto.Message) (version uint64, updated bool, revert ResourceMutatorRevertFunc)

type MockStream

type MockStream struct {
	// contains filtered or unexported fields
}

MockStream is a mock implementation of Stream used for testing.

func NewMockStream

func NewMockStream(ctx context.Context, recvSize, sentSize int, recvTimeout, sentTimeout time.Duration) *MockStream

NewMockStream creates a new mock Stream for testing.

func (*MockStream) Close

func (s *MockStream) Close()

Close closes the resources used by this MockStream.

func (*MockStream) Recv

func (*MockStream) RecvResponse

RecvResponse receives a response that was queued by calling Send.

func (*MockStream) Send

func (*MockStream) SendRequest

SendRequest queues a request to be received by calling Recv.

type ObservableResourceSet

type ObservableResourceSet interface {
	ObservableResourceSource
	ResourceMutator
}

ObservableResourceSet is a ResourceSet that allows registering observers of new resource versions from this source.

type ObservableResourceSource

type ObservableResourceSource interface {
	ResourceSource

	// AddResourceVersionObserver registers an observer of new versions of
	// resources from this source.
	AddResourceVersionObserver(listener ResourceVersionObserver)

	// RemoveResourceVersionObserver unregisters an observer of new versions of
	// resources from this source.
	RemoveResourceVersionObserver(listener ResourceVersionObserver)
}

ObservableResourceSource is a ResourceSource that allows registering observers of new resource versions from this source.

type ProxyError

type ProxyError struct {
	Err    error
	Detail string
}

ProxyError wraps the error and the detail received from the proxy in to a new type that implements the error interface.

func (*ProxyError) Error

func (pe *ProxyError) Error() string

type ResourceMutator

type ResourceMutator interface {
	// Upsert inserts or updates a resource from this set by name.
	// If the set is modified (the resource is actually inserted or updated),
	// the set's version number is incremented atomically and the returned
	// updated value is true.
	// Otherwise, the version number is not modified and the returned updated
	// value is false.
	// The returned version value is the set's version after update.
	// A call to the returned revert function reverts the effects of this
	// method call.
	Upsert(typeURL string, resourceName string, resource proto.Message) (version uint64, updated bool, revert ResourceMutatorRevertFunc)

	// Delete deletes a resource from this set by name.
	// If the set is modified (the resource is actually deleted), the set's
	// version number is incremented atomically and the returned updated value
	// is true.
	// Otherwise, the version number is not modified and the returned updated
	// value is false.
	// The returned version value is the set's version after update.
	// A call to the returned revert function reverts the effects of this
	// method call.
	Delete(typeURL string, resourceName string) (version uint64, updated bool, revert ResourceMutatorRevertFunc)

	// Clear deletes all the resources of the given type from this set.
	// If the set is modified (at least one resource is actually deleted),
	// the set's version number is incremented atomically and the returned
	// updated value is true.
	// Otherwise, the version number is not modified and the returned updated
	// value is false.
	// The returned version value is the set's version after update.
	// This method call cannot be reverted.
	Clear(typeURL string) (version uint64, updated bool)
}

ResourceMutator provides write access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.

type ResourceMutatorRevertFunc

type ResourceMutatorRevertFunc func() (version uint64, updated bool)

ResourceMutatorRevertFunc is a function which reverts the effects of an update on a ResourceMutator. The returned version value is the set's version after update.

type ResourceSet

type ResourceSet interface {
	ResourceSource
	ResourceMutator
}

ResourceSet provides read-write access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.

type ResourceSource

type ResourceSource interface {
	// GetResources returns the current version of the resources with the given
	// names.
	// If lastVersion is not nil and the resources with the given names haven't
	// changed since lastVersion, nil is returned.
	// If resourceNames is empty, all resources are returned.
	// Should not be blocking.
	GetResources(ctx context.Context, typeURL string, lastVersion uint64,
		nodeIP string, resourceNames []string) (*VersionedResources, error)

	// EnsureVersion increases this resource set's version to be at least the
	// given version. If the current version is already higher than the
	// given version, this has no effect.
	EnsureVersion(typeURL string, version uint64)
}

ResourceSource provides read access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.

type ResourceTypeConfiguration

type ResourceTypeConfiguration struct {
	// Source contains the resources of this type.
	Source ObservableResourceSource

	// AckObserver is called back whenever a node acknowledges having applied a
	// version of the resources of this type.
	AckObserver ResourceVersionAckObserver
}

ResourceTypeConfiguration is the configuration of the XDS server for a resource type.

type ResourceVersionAckObserver

type ResourceVersionAckObserver interface {
	// HandleResourceVersionAck notifies that the node with the given NodeIP
	// has acknowledged having applied the resources.
	// Calls to this function must not block.
	HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, typeURL string, detail string)
}

ResourceVersionAckObserver defines the HandleResourceVersionAck method which is called whenever a node acknowledges having applied a version of the resources of a given type.

type ResourceVersionObserver

type ResourceVersionObserver interface {
	// HandleNewResourceVersion notifies of a new version of the resources of
	// the given type.
	HandleNewResourceVersion(typeURL string, version uint64)
}

ResourceVersionObserver defines the HandleNewResourceVersion method which is called whenever the version of the resources of a given type has changed.

type ResourceWatcher

type ResourceWatcher struct {
	// contains filtered or unexported fields
}

ResourceWatcher watches and retrieves new versions of resources from a resource set. ResourceWatcher implements ResourceVersionObserver to get notified when new resource versions are available in the set.

func NewResourceWatcher

func NewResourceWatcher(typeURL string, resourceSet ResourceSource, resourceAccessTimeout time.Duration) *ResourceWatcher

NewResourceWatcher creates a new ResourceWatcher backed by the given resource set.

func (*ResourceWatcher) HandleNewResourceVersion

func (w *ResourceWatcher) HandleNewResourceVersion(typeURL string, version uint64)

func (*ResourceWatcher) WatchResources

func (w *ResourceWatcher) WatchResources(ctx context.Context, typeURL string, lastVersion uint64, nodeIP string,
	resourceNames []string, out chan<- *VersionedResources)

WatchResources watches for new versions of specific resources and sends them into the given out channel.

A call to this method blocks until a version greater than lastVersion is available. Therefore, every call must be done in a separate goroutine. A watch can be canceled by canceling the given context.

lastVersion is the last version successfully applied by the client; nil if this is the first request for resources. This method call must always close the out channel.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements the handling of xDS streams.

func NewServer

func NewServer(resourceTypes map[string]*ResourceTypeConfiguration,
	resourceAccessTimeout time.Duration) *Server

NewServer creates an xDS gRPC stream handler using the given resource sources. types maps each supported resource type URL to its corresponding resource source and ACK observer.

func (*Server) HandleRequestStream

func (s *Server) HandleRequestStream(ctx context.Context, stream Stream, defaultTypeURL string) error

HandleRequestStream receives and processes the requests from an xDS stream.

type Stream

type Stream interface {
	// Send sends a xDS response back to the client.
	Send(*envoy_service_discovery.DiscoveryResponse) error

	// Recv receives a xDS request from the client.
	Recv() (*envoy_service_discovery.DiscoveryRequest, error)
}

Stream is the subset of the gRPC bi-directional stream types which is used by Server.

type VersionedResources

type VersionedResources struct {
	// Version is the version of the resources.
	Version uint64

	// ResourceNames is the list of names of resources.
	// May be empty.
	ResourceNames []string

	// Resources is the list of protobuf-encoded resources.
	// May be empty. Must be of the same length as ResourceNames.
	Resources []proto.Message

	// Canary indicates whether the client should only do a dry run of
	// using  the resources.
	Canary bool
}

VersionedResources is a set of protobuf-encoded resources along with their version.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL