cache

package
v0.9.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2021 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package cache defines a configuration cache for the server.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetResourceName

func GetResourceName(res types.Resource) string

GetResourceName returns the resource name for a valid xDS response type.

func GetResourceReferences

func GetResourceReferences(resources map[string]types.ResourceWithTtl) map[string]bool

GetResourceReferences returns the names for dependent resources (EDS cluster names for CDS, RDS routes names for LDS).

func GetResponseType

func GetResponseType(typeURL string) types.ResponseType

GetResponseType returns the enumeration for a valid xDS type URL

func GetResponseTypeURL

func GetResponseTypeURL(responseType types.ResponseType) (string, error)

GetResponseTypeURL returns the type url for a valid enum

func HashResource

func HashResource(resource []byte) string

HashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes

func IndexRawResourcesByName

func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource

IndexRawResourcesByName creates a map from the resource name to the resource.

func IndexResourcesByName

func IndexResourcesByName(items []types.ResourceWithTtl) map[string]types.ResourceWithTtl

IndexResourcesByName creates a map from the resource name to the resource.

func MarshalResource

func MarshalResource(resource types.Resource) (types.MarshaledResource, error)

MarshalResource converts the Resource to MarshaledResource

Types

type Cache

type Cache interface {
	ConfigWatcher
	ConfigFetcher
}

Cache is a generic config cache with a watcher.

type ConfigFetcher

type ConfigFetcher interface {
	// Fetch implements the polling method of the config cache using a non-empty request.
	Fetch(context.Context, *Request) (Response, error)
}

ConfigFetcher fetches configuration resources from cache

type ConfigWatcher

type ConfigWatcher interface {
	// CreateWatch returns a new open watch from a non-empty request.
	// An individual consumer normally issues a single open watch by each type URL.
	//
	// Value channel produces requested resources, once they are available.  If
	// the channel is closed prior to cancellation of the watch, an unrecoverable
	// error has occurred in the producer, and the consumer should close the
	// corresponding stream.
	//
	// Cancel is an optional function to release resources in the producer. If
	// provided, the consumer may call this function multiple times.
	CreateWatch(*Request) (value chan Response, cancel func())

	// CreateDeltaWatch returns a new open incremental xDS watch.
	//
	// Value channel produces requested resources, or spontaneous updates in accordance
	// with the incremental xDS specification. If the channel is closed
	// prior to cancellation of the watch, an unrecoverable error has occurred in the producer,
	// and the consumer should close the corresponding stream.
	//
	// Cancel is an optional function to release resources in the producer. If
	// provided, the consumer may call this function multiple times.
	CreateDeltaWatch(*DeltaRequest, *stream.StreamState) (value chan DeltaResponse, cancel func())
}

ConfigWatcher requests watches for configuration resources by a node, last applied version identifier, and resource names hint. The watch should send the responses when they are ready. The watch can be cancelled by the consumer, in effect terminating the watch for the request. ConfigWatcher implementation must be thread-safe.

type DeltaPassthroughResponse

type DeltaPassthroughResponse struct {
	// Request is the latest delta request on the stream
	DeltaRequest *discovery.DeltaDiscoveryRequest

	// NextVersionMap consists of updated version mappings after this response is applied
	NextVersionMap map[string]string

	// This discovery response that needs to be sent as is, without any marshalling transformations
	DeltaDiscoveryResponse *discovery.DeltaDiscoveryResponse
}

DeltaPassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.

func (*DeltaPassthroughResponse) GetDeltaDiscoveryResponse

func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error)

GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.

func (*DeltaPassthroughResponse) GetDeltaRequest

GetDeltaRequest returns the original Delta Discovery Request

func (*DeltaPassthroughResponse) GetNextVersionMap

func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string

NextVersionMap returns the version map from a DeltaPassthroughResponse

func (*DeltaPassthroughResponse) GetSystemVersion

func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error)

GetSystemVersion returns the response version.

type DeltaRequest

type DeltaRequest = discovery.DeltaDiscoveryRequest

DeltaRequest is an alias for the delta discovery request type.

type DeltaResources

type DeltaResources struct {
	// Version information
	SystemVersion string

	// Items in the group indexed by name
	Items resourceItems
}

DeltaResources is a versioned group of resources which also contains individual resource versions per the incremental xDS protocol

type DeltaResponse

type DeltaResponse interface {
	// Get the constructed DeltaDiscoveryResponse
	GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error)

	// Get the request that created the watch that we're now responding to. This is provided to allow the caller to correlate the
	// response with a request. Generally this will be the latest request seen on the stream for the specific type.
	GetDeltaRequest() *discovery.DeltaDiscoveryRequest

	// Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation.
	GetSystemVersion() (string, error)

	// Get the version map of the internal cache.
	// The version map consists of updated version mappings after this response is applied
	GetNextVersionMap() map[string]string
}

DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse

type DeltaResponseWatch

type DeltaResponseWatch struct {
	// Request is the most recent delta request for the watch
	Request *DeltaRequest

	// Response is the channel to push the delta responses to
	Response chan DeltaResponse

	// VersionMap for the stream
	StreamState *stream.StreamState
}

DeltaResponseWatch is a watch record keeping both the delta request and an open channel for the delta response.

type IDHash

type IDHash struct{}

IDHash uses ID field as the node hash.

func (IDHash) ID

func (IDHash) ID(node *core.Node) string

ID uses the node ID field

type LinearCache

type LinearCache struct {
	// contains filtered or unexported fields
}

LinearCache supports collectons of opaque resources. This cache has a single collection indexed by resource names and manages resource versions internally. It implements the cache interface for a single type URL and should be combined with other caches via type URL muxing. It can be used to supply EDS entries, for example, uniformly across a fleet of proxies.

func NewLinearCache

func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache

NewLinearCache creates a new cache. See the comments on the struct definition.

func (*LinearCache) CreateDeltaWatch

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func())

func (*LinearCache) CreateWatch

func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func())

func (*LinearCache) DeleteResource

func (cache *LinearCache) DeleteResource(name string) error

DeleteResource removes a resource in the collection.

func (*LinearCache) Fetch

func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error)

func (*LinearCache) NumWatches

func (cache *LinearCache) NumWatches(name string) int

Number of active watches for a resource name.

func (*LinearCache) UpdateResource

func (cache *LinearCache) UpdateResource(name string, res types.Resource) error

UpdateResource updates a resource in the collection.

type LinearCacheOption

type LinearCacheOption func(*LinearCache)

Options for modifying the behavior of the linear cache.

func WithInitialResources

func WithInitialResources(resources map[string]types.Resource) LinearCacheOption

WithInitialResources initializes the initial set of resources.

func WithVersionPrefix

func WithVersionPrefix(prefix string) LinearCacheOption

WithVersionPrefix sets a version prefix of the form "prefixN" in the version info. Version prefix can be used to distinguish replicated instances of the cache, in case a client re-connects to another instance.

type MuxCache

type MuxCache struct {
	// Classification functions.
	Classify func(Request) string
	// Muxed caches.
	Caches map[string]Cache
}

MuxCache multiplexes across several caches using a classification function. If there is no matching cache for a classification result, the cache responds with an empty closed channel, which effectively terminates the stream on the server. It might be preferred to respond with a "nil" channel instead which will leave the stream open in case the stream is aggregated by making sure there is always a matching cache.

func (*MuxCache) CreateDeltaWatch

func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func())

func (*MuxCache) CreateWatch

func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func())

func (*MuxCache) Fetch

func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error)

type NodeHash

type NodeHash interface {
	// ID function defines a unique string identifier for the remote Envoy node.
	ID(node *core.Node) string
}

NodeHash computes string identifiers for Envoy nodes.

type PassthroughResponse

type PassthroughResponse struct {
	// Request is the original request.
	Request *discovery.DiscoveryRequest

	// The discovery response that needs to be sent as is, without any marshalling transformations.
	DiscoveryResponse *discovery.DiscoveryResponse
}

PassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.

func (*PassthroughResponse) GetDiscoveryResponse

func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

GetDiscoveryResponse returns the final passthrough Discovery Response.

func (*PassthroughResponse) GetRequest

GetRequest returns the original Discovery Request

func (*PassthroughResponse) GetVersion

func (r *PassthroughResponse) GetVersion() (string, error)

GetVersion returns the response version.

type RawDeltaResponse

type RawDeltaResponse struct {
	// Request is the latest delta request on the stream.
	DeltaRequest *discovery.DeltaDiscoveryRequest

	// SystemVersionInfo holds the currently applied response system version and should be used for debugging purposes only.
	SystemVersionInfo string

	// Resources to be included in the response.
	Resources []types.Resource

	// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
	RemovedResources []string

	// NextVersionMap consists of updated version mappings after this response is applied
	NextVersionMap map[string]string
	// contains filtered or unexported fields
}

RawDeltaResponse is a pre-serialized xDS response that utilizes the delta discovery request/response objects.

func (*RawDeltaResponse) GetDeltaDiscoveryResponse

func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error)

GetDeltaDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently. We can do this because the marshalled response does not change across the calls. This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.

func (*RawDeltaResponse) GetDeltaRequest

func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest

GetDeltaRequest returns the original DeltaRequest

func (*RawDeltaResponse) GetNextVersionMap

func (r *RawDeltaResponse) GetNextVersionMap() map[string]string

NextVersionMap returns the version map which consists of updated version mappings after this response is applied

func (*RawDeltaResponse) GetSystemVersion

func (r *RawDeltaResponse) GetSystemVersion() (string, error)

GetSystemVersion returns the raw SystemVersion

type RawResponse

type RawResponse struct {
	// Request is the original request.
	Request *discovery.DiscoveryRequest

	// Version of the resources as tracked by the cache for the given type.
	// Proxy responds with this version as an acknowledgement.
	Version string

	// Resources to be included in the response.
	Resources []types.ResourceWithTtl

	// Whether this is a heartbeat response. For xDS versions that support TTL, this
	// will be converted into a response that doesn't contain the actual resource protobuf.
	// This allows for more lightweight updates that server only to update the TTL timer.
	Heartbeat bool
	// contains filtered or unexported fields
}

RawResponse is a pre-serialized xDS response containing the raw resources to be included in the final Discovery Response.

func (*RawResponse) GetDiscoveryResponse

func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

GetDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently. This is necessary because the marshalled response does not change across the calls. This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.

func (*RawResponse) GetRequest

func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest

GetRequest returns the original Discovery Request.

func (*RawResponse) GetVersion

func (r *RawResponse) GetVersion() (string, error)

GetVersion returns the response version.

type Request

type Request = discovery.DiscoveryRequest

Request is an alias for the discovery request type.

type ResourceWithTtl

type ResourceWithTtl struct {
	Resources []types.Resource
	Ttl       *time.Duration
}

type Resources

type Resources struct {
	// Version information.
	Version string

	// Items in the group indexed by name.
	Items map[string]types.ResourceWithTtl
}

Resources is a versioned group of resources.

func NewResources

func NewResources(version string, items []types.Resource) Resources

NewResources creates a new resource group.

func NewResourcesWithTtl

func NewResourcesWithTtl(version string, items []types.ResourceWithTtl) Resources

NewResources creates a new resource group.

type Response

type Response interface {
	// Get the Constructed DiscoveryResponse
	GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

	// Get the original Request for the Response.
	GetRequest() *discovery.DiscoveryRequest

	// Get the version in the Response.
	GetVersion() (string, error)
}

Response is a wrapper around Envoy's DiscoveryResponse.

type ResponseWatch

type ResponseWatch struct {
	// Request is the original request for the watch.
	Request *Request

	// Response is the channel to push responses to.
	Response chan Response
}

ResponseWatch is a watch record keeping both the request and an open channel for the response.

type Snapshot

type Snapshot struct {
	Resources [types.UnknownType]Resources

	// VersionMap holds the current hash map of all resources in the snapshot.
	// This field should remain nil until it is used, at which point should be
	// instantiated by calling ConstructVersionMap().
	// VersionMap is only to be used with delta xDS.
	VersionMap map[string]map[string]string
}

Snapshot is an internally consistent snapshot of xDS resources. Consistency is important for the convergence as different resource types from the snapshot may be delivered to the proxy in arbitrary order.

func NewSnapshot

func NewSnapshot(version string,
	endpoints []types.Resource,
	clusters []types.Resource,
	routes []types.Resource,
	listeners []types.Resource,
	runtimes []types.Resource,
	secrets []types.Resource) Snapshot

NewSnapshot creates a snapshot from response types and a version.

func NewSnapshotWithResources

func NewSnapshotWithResources(version string, resources SnapshotResources) Snapshot

NewSnapshotWithResources creates a snapshot from response types and a version.

func NewSnapshotWithTtls

func NewSnapshotWithTtls(version string,
	endpoints []types.ResourceWithTtl,
	clusters []types.ResourceWithTtl,
	routes []types.ResourceWithTtl,
	listeners []types.ResourceWithTtl,
	runtimes []types.ResourceWithTtl,
	secrets []types.ResourceWithTtl) Snapshot

func (*Snapshot) Consistent

func (s *Snapshot) Consistent() error

Consistent check verifies that the dependent resources are exactly listed in the snapshot: - all EDS resources are listed by name in CDS resources - all RDS resources are listed by name in LDS resources

Note that clusters and listeners are requested without name references, so Envoy will accept the snapshot list of clusters as-is even if it does not match all references found in xDS.

func (*Snapshot) ConstructVersionMap

func (s *Snapshot) ConstructVersionMap() error

ConstructVersionMap will construct a version map based on the current state of a snapshot

func (*Snapshot) GetResources

func (s *Snapshot) GetResources(typeURL string) map[string]types.Resource

GetResources selects snapshot resources by type, returning the map of resources.

func (*Snapshot) GetResourcesAndTtl

func (s *Snapshot) GetResourcesAndTtl(typeURL string) map[string]types.ResourceWithTtl

GetResourcesAndTtl selects snapshot resources by type, returning the map of resources and the associated TTL.

func (*Snapshot) GetVersion

func (s *Snapshot) GetVersion(typeURL string) string

GetVersion returns the version for a resource type.

func (*Snapshot) GetVersionMap

func (s *Snapshot) GetVersionMap() map[string]map[string]string

GetVersionMap will return the internal version map of the currently applied snapshot.

type SnapshotCache

type SnapshotCache interface {
	Cache

	// SetSnapshot sets a response snapshot for a node. For ADS, the snapshots
	// should have distinct versions and be internally consistent (e.g. all
	// referenced resources must be included in the snapshot).
	//
	// This method will cause the server to respond to all open watches, for which
	// the version differs from the snapshot version.
	SetSnapshot(node string, snapshot Snapshot) error

	// GetSnapshots gets the snapshot for a node.
	GetSnapshot(node string) (Snapshot, error)

	// ClearSnapshot removes all status and snapshot information associated with a node.
	ClearSnapshot(node string)

	// GetStatusInfo retrieves status information for a node ID.
	GetStatusInfo(string) StatusInfo

	// GetStatusKeys retrieves node IDs for all statuses.
	GetStatusKeys() []string
}

SnapshotCache is a snapshot-based cache that maintains a single versioned snapshot of responses per node. SnapshotCache consistently replies with the latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS requests are responded only when all resources in the snapshot xDS response are named as part of the request. It is expected that the CDS response names all EDS clusters, and the LDS response names all RDS routes in a snapshot, to ensure that Envoy makes the request for all EDS clusters or RDS routes eventually.

SnapshotCache can operate as a REST or regular xDS backend. The snapshot can be partial, e.g. only include RDS or EDS resources.

func NewSnapshotCache

func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache

NewSnapshotCache initializes a simple cache.

ADS flag forces a delay in responding to streaming requests until all resources are explicitly named in the request. This avoids the problem of a partial request over a single stream for a subset of resources which would require generating a fresh version for acknowledgement. ADS flag requires snapshot consistency. For non-ADS case (and fetch), multiple partial requests are sent across multiple streams and re-using the snapshot version is OK.

Logger is optional.

func NewSnapshotCacheWithHeartbeating

func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache

NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat responses for resources with a TTL.

ADS flag forces a delay in responding to streaming requests until all resources are explicitly named in the request. This avoids the problem of a partial request over a single stream for a subset of resources which would require generating a fresh version for acknowledgement. ADS flag requires snapshot consistency. For non-ADS case (and fetch), multiple partial requests are sent across multiple streams and re-using the snapshot version is OK.

Logger is optional.

The context provides a way to cancel the heartbeating routine, while the heartbeatInterval parameter controls how often heartbeating occurs.

type SnapshotResources

type SnapshotResources struct {
	Endpoints        []types.Resource
	Clusters         []types.Resource
	Routes           []types.Resource
	Listeners        []types.Resource
	Runtimes         []types.Resource
	Secrets          []types.Resource
	ExtensionConfigs []types.Resource
}

SnapshotResources contains the resources to construct a snapshot from.

type StatusInfo

type StatusInfo interface {
	// GetNode returns the node metadata.
	GetNode() *core.Node

	// GetNumWatches returns the number of open watches.
	GetNumWatches() int

	// GetNumDeltaWatches returns the number of open delta watches.
	GetNumDeltaWatches() int

	// GetLastWatchRequestTime returns the timestamp of the last discovery watch request.
	GetLastWatchRequestTime() time.Time

	// GetLastDeltaWatchRequestTime returns the timestamp of the last delta discovery watch request.
	GetLastDeltaWatchRequestTime() time.Time

	// SetLastDeltaWatchRequestTime will set the current time of the last delta discovery watch request
	SetLastDeltaWatchRequestTime(time.Time)

	// SetDeltaResponseWatch will set the provided delta response watch to the associate watch ID
	SetDeltaResponseWatch(int64, DeltaResponseWatch)
}

StatusInfo tracks the server state for the remote Envoy node. Not all fields are used by all cache implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL