controller

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2025 License: Apache-2.0 Imports: 57 Imported by: 0

Documentation

Overview

Copyright 2024 Illumio, Inc. All Rights Reserved.

Copyright 2024 Illumio, Inc. All Rights Reserved.

Index

Constants

View Source
const (
	ONBOARDING_CLIENT_ID     = onboardingCredentialRequiredField("client_id")
	ONBOARDING_CLIENT_SECRET = onboardingCredentialRequiredField("client_secret")
)
View Source
const (
	// Protocols
	ICMP = "icmp"
	TCP  = "tcp"
	UDP  = "udp"
	SCTP = "sctp"

	// IP Versions
	IPv4 = "ipv4"
	IPv6 = "ipv6"
)
View Source
const (
	STREAM_NETWORK_FLOWS = StreamType("network_flows")
	STREAM_RESOURCES     = StreamType("resources")
	STREAM_LOGS          = StreamType("logs")
	STREAM_CONFIGURATION = StreamType("configuration")
)

Variables

View Source
var (
	// In the onboarding flow, an administrator gives cloud-operator credentials
	// via helm's value.yaml mechanism. For the sake of operability,
	// cloud-operator then persists these credentials into a k8s secret, so
	// subsequent installs on the same cluster do not require the administrator to
	// repeat the credentials every time. There are multiple specific fields in
	// this secret
	//
	// This error type indicates that at least one of the required fields is
	// missing from the secret.
	ErrCredentialNotFoundInK8sSecret error = &credentialNotFoundInK8sSecretError{}
)
View Source
var ErrFalcoEventIsNotFlow = errors.New("ignoring falco event, not a network flow")
View Source
var ErrFalcoIncompleteL3Flow = errors.New("ignoring incomplete falco l3 network flow")
View Source
var ErrFalcoIncompleteL4Flow = errors.New("ignoring incomplete falco l4 network flow")
View Source
var ErrFalcoInvalidPort = errors.New("ignoring incomplete falco flow due to bad ports")
View Source
var ErrFalcoTimestamp = errors.New("incomplete or incorrectly formatted timestamp found in Falco flow")
View Source
var ErrStopRetries = errors.New("stop retries")

Functions

func ConnectStreams added in v0.0.6

func ConnectStreams(ctx context.Context, logger *zap.Logger, envMap EnvironmentConfig, bufferedGrpcSyncer *BufferedGrpcWriteSyncer)

ConnectStreams will continue to reboot and restart the main operations within the operator if any disconnects or errors occur.

func GetClusterID

func GetClusterID(ctx context.Context, logger *zap.Logger) (string, error)

GetClusterID returns the uid of the k8s cluster's kube-system namespace, which is used as the cluster's globally unique ID.

func GetTLSConfig added in v0.0.8

func GetTLSConfig(skipVerify bool) *tls.Config

GetTLSConfig returns a TLS configuration.

func GetTokenSource added in v0.0.8

func GetTokenSource(ctx context.Context, config clientcredentials.Config, tlsConfig *tls.Config) oauth2.TokenSource

GetTokenSource returns an OAuth2 token source.

func IsRunningInCluster

func IsRunningInCluster() bool

IsRunningInCluster helps determine if the application is running inside a Kubernetes cluster.

func NewAuthenticatedConnection added in v0.0.6

func NewAuthenticatedConnection(ctx context.Context, logger *zap.Logger, envMap EnvironmentConfig) (*grpc.ClientConn, pb.KubernetesInfoServiceClient, error)

NewAuthenticatedConnection gets a valid token and creats a connection to CloudSecure.

func NewClientSet

func NewClientSet() (*kubernetes.Clientset, error)

NewClientSet returns a new Kubernetes clientset based on the execution environment.

func NewCredentialNotFoundInK8sSecretError added in v1.0.3

func NewCredentialNotFoundInK8sSecretError(requiredField onboardingCredentialRequiredField) error

func NewFalcoEventHandler added in v0.0.8

func NewFalcoEventHandler(eventChan chan<- string) http.HandlerFunc

NewFalcoEventHandler creates a new HTTP handler function for processing Falco events.

func NewGRPCLogger added in v1.0.5

func NewGRPCLogger(grpcSyncer *BufferedGrpcWriteSyncer, addCaller bool, clock zapcore.Clock) *zap.Logger

NewGRPCLogger creates a Zap logger with multiple writesyncs: one to stdout and one for GRPC writestream

func NewProductionGRPCLogger added in v1.0.5

func NewProductionGRPCLogger(grpcSyncer *BufferedGrpcWriteSyncer) *zap.Logger

NewProductionGRPCLogger creates a Zap logger configured for production.

func ParseToken added in v0.0.8

func ParseToken(tokenString string) (jwt.MapClaims, error)

ParseToken parses the JWT token and returns the claims.

func ServerIsHealthy

func ServerIsHealthy() bool

ServerIsHealthy checks if a deadlock has occured within the threaded resource listing process.

func SetUpOAuthConnection

func SetUpOAuthConnection(
	ctx context.Context,
	logger *zap.Logger,
	tokenURL string,
	tlsSkipVerify bool,
	clientID string,
	clientSecret string,
) (*grpc.ClientConn, error)

SetUpOAuthConnection establishes a gRPC connection using OAuth credentials and logging the process.

Types

type Action added in v1.1.0

type Action func() error

type Authenticator added in v0.0.6

type Authenticator struct {
	Logger *zap.Logger
}

Authenticator keeps a logger for its own methods.

func (*Authenticator) DoesK8sSecretExist added in v0.0.6

func (authn *Authenticator) DoesK8sSecretExist(ctx context.Context, secretName string, podNamespace string) bool

func (*Authenticator) GetOnboardingCredentials added in v0.0.6

func (authn *Authenticator) GetOnboardingCredentials(ctx context.Context, clientID string, clientSecret string) (Credentials, error)

GetOnboardingCredentials returns credentials to onboard this cluster with CloudSecure.

func (*Authenticator) ReadCredentialsK8sSecrets added in v0.0.6

func (authn *Authenticator) ReadCredentialsK8sSecrets(ctx context.Context, secretName string, podNamespace string) (string, string, error)

ReadK8sSecret takes a secretName and reads the file.

func (*Authenticator) WriteK8sSecret added in v0.0.6

func (authn *Authenticator) WriteK8sSecret(ctx context.Context, keyData OnboardResponse, ClusterCreds string, podNamespace string) error

WriteK8sSecret updates the data in an existing Kubernetes Secret without overwriting annotations or labels.

type BufferedGrpcWriteSyncer added in v0.0.2

type BufferedGrpcWriteSyncer struct {
	// contains filtered or unexported fields
}

BufferedGrpcWriteSyncer is a custom zap writesync that writes to a grpc stream In case stream is not connected it will buffer to memory

func NewBufferedGrpcWriteSyncer added in v0.0.2

func NewBufferedGrpcWriteSyncer() *BufferedGrpcWriteSyncer

NewBufferedGrpcWriteSyncer returns a new BufferedGrpcWriteSyncer

func (*BufferedGrpcWriteSyncer) Close added in v0.0.2

func (b *BufferedGrpcWriteSyncer) Close() error

Close flushes buffered log data into grpc stream if possible, and closes the connection.

func (*BufferedGrpcWriteSyncer) UpdateClient added in v0.0.2

UpdateClient updates the gRPC connection and connection in the BufferedGrpcWriteSyncer.

type CiliumFlowCollector added in v0.0.5

type CiliumFlowCollector struct {
	// contains filtered or unexported fields
}

CiliumFlowCollector collects flows from Cilium Hubble Relay running in this cluster.

type ClientConnInterface added in v0.0.2

type ClientConnInterface interface {
	GetState() connectivity.State
	Close() error
}

type Credentials

type Credentials struct {
	ClientID     string `json:"client_id"`
	ClientSecret string `json:"client_secret"`
}

Credentials contains attributes that are needed for onboarding.

type EnvironmentConfig

type EnvironmentConfig struct {
	// Namspace of Cilium.
	CiliumNamespace string
	// K8s cluster secret name.
	ClusterCreds string
	// Client ID for onboarding. "" if not specified, i.e. if the operator is not meant to onboard itself.
	OnboardingClientId string
	// Client secret for onboarding. "" if not specified, i.e. if the operator is not meant to onboard itself.
	OnboardingClientSecret string
	// URL of the onboarding endpoint.
	OnboardingEndpoint string
	// Port for the IPFIX collector
	IPFIXCollectorPort string
	// Namespace of OVN-Kubernetes
	OVNKNamespace string
	// URL of the token endpoint.
	TokenEndpoint string
	// Whether to skip TLS certificate verification when starting a stream.
	TlsSkipVerify bool
	// KeepalivePeriods specifies the period (minus jitter) between two keepalives sent on each stream
	KeepalivePeriods KeepalivePeriods
	// PodNamespace is the namespace where the cloud-operator is deployed
	PodNamespace string
	// How long must a stream be in a state for our exponentialBackoff function to
	// consider it a success.
	StreamSuccessPeriod StreamSuccessPeriod
	// HTTP Proxy URL
	HttpsProxy string
	// Whether to enable verbose debugging.
	VerboseDebugging bool
}

type FalcoEvent added in v0.0.8

type FalcoEvent struct {
	// Timestamp is the time the network event occured. ISO 8601 format
	Timestamp *timestamppb.Timestamp `json:"time"`
	// SrcIP is the source IP address involved in the network event.
	SrcIP string `json:"srcip"`
	// DstIP is the destination IP address involved in the network event.
	DstIP string `json:"dstip"`
	// SrcPort is the source port number involved in the network event.
	SrcPort string `json:"srcport"`
	// DstPort is the destination port number involved in the network event.
	DstPort string `json:"dstport"`
	// Proto is the protocol used in the network event (e.g., TCP, UDP).
	Proto string `json:"proto"`
	// IpVersion is the version used in the network event (e.g. ipv4, ipv6).
	IpVersion string `json:"prototype"`
}

FalcoEvent represents the network information extracted from a Falco event.

type FlowCache added in v1.1.3

type FlowCache struct {
	// contains filtered or unexported fields
}

FlowCache caches flows to be exported. Evicts flows from the cache to be send to the collector for any of the following reasons: - lack of resources: the cache has reached the maximum capacity configured in maxFlows - active timeout: the flow has been cache longer than the configured activeTimeout See https://www.rfc-editor.org/rfc/rfc5102.html#section-5.11.3 for the definition of those reasons.

func NewFlowCache added in v1.1.3

func NewFlowCache(
	activeTimeout time.Duration,
	maxFlows int,
	outFlows chan pb.Flow,
) *FlowCache

func (*FlowCache) CacheFlow added in v1.1.3

func (c *FlowCache) CacheFlow(ctx context.Context, flow pb.Flow) error

CacheFlow aggregates and caches the given flow.

func (*FlowCache) Close added in v1.1.3

func (c *FlowCache) Close() error

Close closes this flow cache's channels. This method must be called exactly once on every FlowCache after use.

func (*FlowCache) Run added in v1.1.3

func (c *FlowCache) Run(ctx context.Context, logger *zap.Logger) error

Run manages the flow cache by evicting expired flows based on the active timeout, processing new flows, and resetting the timer for the next expiration.

type KeepalivePeriods added in v1.1.0

type KeepalivePeriods struct {
	KubernetesNetworkFlows time.Duration
	Logs                   time.Duration
	KubernetesResources    time.Duration
	Configuration          time.Duration
}

type OVNKFlow added in v1.3.0

type OVNKFlow struct {
	SourceIP        string
	DestinationIP   string
	SourcePort      uint16
	DestinationPort uint16
	Protocol        string
	IPVersion       string
	StartTimestamp  *timestamppb.Timestamp
	EndTimestamp    *timestamppb.Timestamp
}

OVNKFlow represents a flow captured from OVN-Kubernetes.

type OnboardResponse

type OnboardResponse struct {
	ClusterClientId     string `json:"cluster_client_id"`
	ClusterClientSecret string `json:"cluster_client_secret"`
}

func Onboard added in v0.0.6

func Onboard(ctx context.Context, TlsSkipVerify bool, OnboardingEndpoint string, credentials Credentials, logger *zap.Logger) (OnboardResponse, error)

Onboard onboards this cluster with CloudSecure using the onboarding credentials and obtains OAuth 2 credentials for this cluster.

type ResourceManager

type ResourceManager struct {
	// contains filtered or unexported fields
}

ResourceManager encapsulates components for listing and managing Kubernetes resources.

func NewResourceManager added in v1.1.5

func NewResourceManager(config ResourceManagerConfig) *ResourceManager

NewResourceManager creates a new ResourceManager for a specific resource type. The logger will automatically include the resource name in all log messages.

func (*ResourceManager) DynamicListResources

func (r *ResourceManager) DynamicListResources(ctx context.Context, logger *zap.Logger, apiGroup string) (string, error)

DynamicListResources lists a specifed resource dynamically and sends down the current gRPC stream.

func (*ResourceManager) ExtractObjectMetas added in v0.0.8

func (r *ResourceManager) ExtractObjectMetas(resources *unstructured.UnstructuredList) ([]metav1.ObjectMeta, error)

ExtractObjectMetas extracts ObjectMeta from a list of unstructured resources.

func (*ResourceManager) FetchResources added in v0.0.8

func (r *ResourceManager) FetchResources(ctx context.Context, resource schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error)

FetchResources retrieves unstructured resources from the K8s API.

func (*ResourceManager) ListResources added in v0.0.8

func (r *ResourceManager) ListResources(ctx context.Context, resource schema.GroupVersionResource, namespace string) ([]metav1.ObjectMeta, string, string, error)

ListResources fetches resources of a specified type and namespace, returning their ObjectMeta, the last resource version observed, and any error encountered.

func (*ResourceManager) WatchK8sResources added in v1.1.2

func (r *ResourceManager) WatchK8sResources(ctx context.Context, cancel context.CancelFunc, apiGroup string, resourceVersion string, mutationChan chan *pb.KubernetesResourceMutation)

WatchK8sResources initiates a watch stream for the specified Kubernetes resource starting from the given resourceVersion. This function blocks until the watch ends or the context is canceled.

type ResourceManagerConfig added in v1.1.5

type ResourceManagerConfig struct {
	ResourceName  string
	Clientset     *kubernetes.Clientset
	BaseLogger    *zap.Logger
	DynamicClient dynamic.Interface
	StreamManager *streamManager
	Limiter       *rate.Limiter
}

ResourceManagerConfig holds the configuration for creating a new ResourceManager

type StreamSuccessPeriod added in v1.1.2

type StreamSuccessPeriod struct {
	Connect time.Duration
	Auth    time.Duration
}

type StreamType added in v1.1.0

type StreamType string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL