cache

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSVIDCacheMaxSize = 1000
	SVIDSyncInterval        = 500 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bundle

type Bundle = bundleutil.Bundle

type BundleCache

type BundleCache struct {
	// contains filtered or unexported fields
}

func NewBundleCache

func NewBundleCache(trustDomain spiffeid.TrustDomain, bundle *Bundle) *BundleCache

func (*BundleCache) Bundle

func (c *BundleCache) Bundle() *Bundle

func (*BundleCache) Bundles

func (c *BundleCache) Bundles() map[spiffeid.TrustDomain]*Bundle

func (*BundleCache) SubscribeToBundleChanges

func (c *BundleCache) SubscribeToBundleChanges() *BundleStream

func (*BundleCache) Update

func (c *BundleCache) Update(bundles map[spiffeid.TrustDomain]*Bundle)

type BundleStream

type BundleStream struct {
	// contains filtered or unexported fields
}

Wraps an observer stream to provide a type safe interface

func NewBundleStream

func NewBundleStream(stream observer.Stream) *BundleStream

func (*BundleStream) Changes

func (b *BundleStream) Changes() chan struct{}

Changes returns the channel that is closed when a new value is available.

func (*BundleStream) Clone

func (b *BundleStream) Clone() *BundleStream

Clone creates a new independent stream from this one but sharing the same Property. Updates to the property will be reflected in both streams but they may have different values depending on when they advance the stream with Next.

func (*BundleStream) HasNext

func (b *BundleStream) HasNext() bool

HasNext checks whether there is a new value available.

func (*BundleStream) Next

func (b *BundleStream) Next() map[spiffeid.TrustDomain]*Bundle

Next advances this stream to the next state. You should never call this unless Changes channel is closed.

func (*BundleStream) Value

func (b *BundleStream) Value() map[spiffeid.TrustDomain]*Bundle

Value returns the current value for this stream.

func (*BundleStream) WaitNext

func (b *BundleStream) WaitNext() map[spiffeid.TrustDomain]*Bundle

WaitNext waits for Changes to be closed, advances the stream and returns the current value.

type Cache

type Cache struct {
	*BundleCache
	*JWTSVIDCache
	// contains filtered or unexported fields
}

Cache caches each registration entry, signed X509-SVIDs for those entries, bundles, and JWT SVIDs for the agent. It allows subscriptions by (workload) selector sets and notifies subscribers when:

1) a registration entry related to the selectors:

  • is modified
  • has a new X509-SVID signed for it
  • federates with a federated bundle that is updated

2) the trust bundle for the agent trust domain is updated

When notified, the subscriber is given a WorkloadUpdate containing related identities and trust bundles.

The cache does this efficiently by building an index for each unique selector it encounters. Each selector index tracks the subscribers (i.e workloads) and registration entries that have that selector.

When registration entries are added/updated/removed, the set of relevant selectors are gathered and the indexes for those selectors are combed for all relevant subscribers.

For each relevant subscriber, the selector index for each selector of the subscriber is combed for registration whose selectors are a subset of the subscriber selector set. Identities for those entries are added to the workload update returned to the subscriber.

NOTE: The cache is intended to be able to handle thousands of workload subscriptions, which can involve thousands of certificates, keys, bundles, and registration entries, etc. The selector index itself is intended to be scalable, but the objects themselves can take a considerable amount of memory. For maximal safety, the objects should be cloned both coming in and leaving the cache. However, during global updates (e.g. trust bundle is updated for the agent trust domain) in particular, cloning all of the relevant objects for each subscriber causes HUGE amounts of memory pressure which adds non-trivial amounts of latency and causes a giant memory spike that could OOM the agent on smaller VMs. For this reason, the cache is presumed to own ALL data passing in and out of the cache. Producers and consumers MUST NOT mutate the data.

func New

func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache

func (*Cache) CountSVIDs

func (c *Cache) CountSVIDs() int

func (*Cache) Entries

func (c *Cache) Entries() []*common.RegistrationEntry

func (*Cache) FetchWorkloadUpdate

func (c *Cache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate

func (*Cache) GetStaleEntries

func (c *Cache) GetStaleEntries() []*StaleEntry

GetStaleEntries obtains a list of stale entries

func (*Cache) Identities

func (c *Cache) Identities() []Identity

Identities is only used by manager tests TODO: We should remove this and find a better way

func (*Cache) MatchingIdentities

func (c *Cache) MatchingIdentities(selectors []*common.Selector) []Identity

func (*Cache) MatchingRegistrationEntries

func (c *Cache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry

func (*Cache) SubscribeToWorkloadUpdates

func (c *Cache) SubscribeToWorkloadUpdates(ctx context.Context, selectors Selectors) (Subscriber, error)

func (*Cache) SyncSVIDsWithSubscribers

func (c *Cache) SyncSVIDsWithSubscribers()

func (*Cache) UpdateEntries

func (c *Cache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool)

UpdateEntries updates the cache with the provided registration entries and bundles and notifies impacted subscribers. The checkSVID callback, if provided, is used to determine if the SVID for the entry is stale, or otherwise in need of rotation. Entries marked stale through the checkSVID callback are returned from GetStaleEntries() until the SVID is updated through a call to UpdateSVIDs.

func (*Cache) UpdateSVIDs

func (c *Cache) UpdateSVIDs(update *UpdateSVIDs)

type Identity

type Identity struct {
	Entry      *common.RegistrationEntry
	SVID       []*x509.Certificate
	PrivateKey crypto.Signer
}

Identity holds the data for a single workload identity

type JWTSVIDCache

type JWTSVIDCache struct {
	// contains filtered or unexported fields
}

func NewJWTSVIDCache

func NewJWTSVIDCache() *JWTSVIDCache

func (*JWTSVIDCache) GetJWTSVID

func (c *JWTSVIDCache) GetJWTSVID(spiffeID spiffeid.ID, audience []string) (*client.JWTSVID, bool)

func (*JWTSVIDCache) SetJWTSVID

func (c *JWTSVIDCache) SetJWTSVID(spiffeID spiffeid.ID, audience []string, svid *client.JWTSVID)

type LRUCache

type LRUCache struct {
	*BundleCache
	*JWTSVIDCache
	// contains filtered or unexported fields
}

Cache caches each registration entry, bundles, and JWT SVIDs for the agent. The signed X509-SVIDs for those entries are stored in LRU-like cache. It allows subscriptions by (workload) selector sets and notifies subscribers when:

1) a registration entry related to the selectors:

  • is modified
  • has a new X509-SVID signed for it
  • federates with a federated bundle that is updated

2) the trust bundle for the agent trust domain is updated

When notified, the subscriber is given a WorkloadUpdate containing related identities and trust bundles.

The cache does this efficiently by building an index for each unique selector it encounters. Each selector index tracks the subscribers (i.e workloads) and registration entries that have that selector.

The LRU-like SVID cache has configurable size limit and expiry period.

  1. Size limit of SVID cache is a soft limit. If SVID has a subscriber present then that SVID is never removed from cache.
  2. Least recently used SVIDs are removed from cache only after the cache expiry period has passed. This is done to reduce the overall cache churn.
  3. Last access timestamp for SVID cache entry is updated when a new subscriber is created
  4. When a new subscriber is created and there is a cache miss then subscriber needs to wait for next SVID sync event to receive WorkloadUpdate with newly minted SVID

The advantage of above approach is that if agent has entry count less than cache size then all SVIDs are cached at all times. If agent has entry count greater than cache size then subscribers will continue to get SVID updates (potential delay for first WorkloadUpdate if cache miss) and least used SVIDs will be removed from cache which will save memory usage. This allows agent to support environments where the active simultaneous workload count is a small percentage of the large number of registrations assigned to the agent.

When registration entries are added/updated/removed, the set of relevant selectors are gathered and the indexes for those selectors are combed for all relevant subscribers.

For each relevant subscriber, the selector index for each selector of the subscriber is combed for registration whose selectors are a subset of the subscriber selector set. Identities for those entries are added to the workload update returned to the subscriber.

NOTE: The cache is intended to be able to handle thousands of workload subscriptions, which can involve thousands of certificates, keys, bundles, and registration entries, etc. The selector index itself is intended to be scalable, but the objects themselves can take a considerable amount of memory. For maximal safety, the objects should be cloned both coming in and leaving the cache. However, during global updates (e.g. trust bundle is updated for the agent trust domain) in particular, cloning all of the relevant objects for each subscriber causes HUGE amounts of memory pressure which adds non-trivial amounts of latency and causes a giant memory spike that could OOM the agent on smaller VMs. For this reason, the cache is presumed to own ALL data passing in and out of the cache. Producers and consumers MUST NOT mutate the data.

func NewLRUCache

func NewLRUCache(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics,
	svidCacheMaxSize int, clk clock.Clock) *LRUCache

func (*LRUCache) CountSVIDs

func (c *LRUCache) CountSVIDs() int

func (*LRUCache) Entries

func (c *LRUCache) Entries() []*common.RegistrationEntry

func (*LRUCache) FetchWorkloadUpdate

func (c *LRUCache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate

func (*LRUCache) GetStaleEntries

func (c *LRUCache) GetStaleEntries() []*StaleEntry

GetStaleEntries obtains a list of stale entries

func (*LRUCache) Identities

func (c *LRUCache) Identities() []Identity

Identities is only used by manager tests TODO: We should remove this and find a better way

func (*LRUCache) MatchingRegistrationEntries

func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry

func (*LRUCache) NewSubscriber

func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber

NewSubscriber creates a subscriber for given selector set. Separately call Notify for the first time after this method is invoked to receive latest updates.

func (*LRUCache) Notify

func (c *LRUCache) Notify(selectors []*common.Selector) bool

Notify subscribers of selector set only if all SVIDs for corresponding selector set are cached It returns whether all SVIDs are cached or not. This method should be retried with backoff to avoid lock contention.

func (*LRUCache) SubscribeToWorkloadUpdates

func (c *LRUCache) SubscribeToWorkloadUpdates(ctx context.Context, selectors Selectors) (Subscriber, error)

func (*LRUCache) SyncSVIDsWithSubscribers

func (c *LRUCache) SyncSVIDsWithSubscribers()

SyncSVIDsWithSubscribers will sync svid cache: entries with active subscribers which are not cached will be put in staleEntries map records which are not cached for remainder of max cache size will also be put in staleEntries map

func (*LRUCache) UpdateEntries

func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool)

UpdateEntries updates the cache with the provided registration entries and bundles and notifies impacted subscribers. The checkSVID callback, if provided, is used to determine if the SVID for the entry is stale, or otherwise in need of rotation. Entries marked stale through the checkSVID callback are returned from GetStaleEntries() until the SVID is updated through a call to UpdateSVIDs.

func (*LRUCache) UpdateSVIDs

func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs)

type Selectors

type Selectors []*common.Selector

type StaleEntry

type StaleEntry struct {
	// Entry stale registration entry
	Entry *common.RegistrationEntry
	// SVIDs expiration time
	ExpiresAt time.Time
}

StaleEntry holds stale entries with SVIDs expiration time

type Subscriber

type Subscriber interface {
	Updates() <-chan *WorkloadUpdate
	Finish()
}

type UpdateEntries

type UpdateEntries struct {
	// Bundles is a set of ALL trust bundles available to the agent, keyed by trust domain
	Bundles map[spiffeid.TrustDomain]*bundleutil.Bundle

	// RegistrationEntries is a set of ALL registration entries available to the
	// agent, keyed by registration entry id.
	RegistrationEntries map[string]*common.RegistrationEntry
}

Update holds information for an entries update to the cache.

type UpdateSVIDs

type UpdateSVIDs struct {
	// X509SVIDs is a set of updated X509-SVIDs that should be merged into
	// the cache, keyed by registration entry id.
	X509SVIDs map[string]*X509SVID
}

Update holds information for an SVIDs update to the cache.

type WorkloadUpdate

type WorkloadUpdate struct {
	Identities       []Identity
	Bundle           *bundleutil.Bundle
	FederatedBundles map[spiffeid.TrustDomain]*bundleutil.Bundle
}

WorkloadUpdate is used to convey workload information to cache subscribers

func (*WorkloadUpdate) HasIdentity

func (u *WorkloadUpdate) HasIdentity() bool

type X509SVID

type X509SVID struct {
	Chain      []*x509.Certificate
	PrivateKey crypto.Signer
}

X509SVID holds onto the SVID certificate chain and private key.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL