Documentation
¶
Overview ¶
Package stream provides gRPC stream management for CloudSecure communication.
Note: LogStream interface is defined in logging/grpc_logger.go (not here) to avoid circular dependency. The logging package's BufferedGrpcWriteSyncer requires LogStream, and the stream package imports logging. Moving LogStream here would create: stream → logging → stream (circular import).
Index ¶
- Constants
- Variables
- func ConnectStreams(ctx context.Context, logger *zap.Logger, envMap Config, ...)
- func ManageStream(ctx context.Context, logger *zap.Logger, conn grpc.ClientConnInterface, ...)
- func NewAuthenticatedConnection(ctx context.Context, logger *zap.Logger, envMap Config) (*grpc.ClientConn, error)
- func ServerIsHealthy() bool
- func SetProcessingResources(processing bool)
- func StartStatsLogger(ctx context.Context, logger *zap.Logger, stats *Stats, period time.Duration)
- type Action
- type Config
- type FactoryConfig
- type ManagedFactory
- type Stats
- func (s *Stats) GetAndResetStats() (flowsReceived, flowsSent, resourceMutations, configuredObjectMutations uint64)
- func (s *Stats) IncrementConfiguredObjectMutations()
- func (s *Stats) IncrementFlowsReceived()
- func (s *Stats) IncrementFlowsSentToClusterSync()
- func (s *Stats) IncrementResourceMutations()
- type StreamClient
- type StreamClientFactory
- type SuccessPeriods
Constants ¶
const ( // ConnectionRetryInterval is the initial interval between connection attempts. ConnectionRetryInterval = 5 * time.Second // ConnectionRetryJitter is the jitter percentage for connection retry timers. ConnectionRetryJitter = 0.20 )
Connection Retry Configuration.
const ( // StreamInitialBackoff is the initial backoff duration for stream reconnection. StreamInitialBackoff = 1 * time.Second // StreamMaxBackoff is the maximum backoff duration for stream reconnection. StreamMaxBackoff = 1 * time.Minute // StreamMaxJitterPct is the maximum jitter percentage for stream backoff. StreamMaxJitterPct = 0.20 // StreamSevereErrorThreshold is the number of errors before considering severe. StreamSevereErrorThreshold = 10 // StreamExponentialFactor is the exponential factor for backoff calculation. StreamExponentialFactor = 2.0 )
Stream Backoff Configuration.
const ( // ResetInitialBackoff is the initial backoff for reset operations. ResetInitialBackoff = 10 * time.Minute // ResetMaxBackoff is the maximum backoff for reset operations. ResetMaxBackoff = 10 * time.Second // ResetMaxJitterPct is the maximum jitter percentage for reset backoff. ResetMaxJitterPct = 0.10 )
Reset Backoff Configuration.
const ( // ResourceProcessingTimeout is the maximum time allowed for resource processing // before the server is considered unhealthy. ResourceProcessingTimeout = 5 * time.Minute )
Health Check Configuration.
Variables ¶
var ErrStopRetries = errors.New("stop retries")
ErrStopRetries signals that retries should stop.
Functions ¶
func ConnectStreams ¶
func ConnectStreams( ctx context.Context, logger *zap.Logger, envMap Config, factoryConfig FactoryConfig, )
ConnectStreams orchestrates all streams using the factory pattern.
func ManageStream ¶
func ManageStream( ctx context.Context, logger *zap.Logger, conn grpc.ClientConnInterface, factory StreamClientFactory, keepalivePeriod time.Duration, successPeriods SuccessPeriods, done chan struct{}, )
ManageStream manages a stream with backoff and reconnection logic.
func NewAuthenticatedConnection ¶
func NewAuthenticatedConnection(ctx context.Context, logger *zap.Logger, envMap Config) (*grpc.ClientConn, error)
NewAuthenticatedConnection gets a valid token and creates a connection to CloudSecure.
func ServerIsHealthy ¶
func ServerIsHealthy() bool
ServerIsHealthy checks if a deadlock has occurred within the resource listing process.
func SetProcessingResources ¶
func SetProcessingResources(processing bool)
SetProcessingResources updates the deadlock detector state.
Types ¶
type Config ¶
type Config struct {
ClusterCreds string
ClusterName string // Optional: cluster name for self-managed clusters
HttpsProxy string
OnboardingClientID string
OnboardingClientSecret string
OnboardingEndpoint string
PodNamespace string
StatsLogPeriod time.Duration
SuccessPeriods SuccessPeriods
TlsSkipVerify bool
TokenEndpoint string
}
Config holds configuration for stream manager authentication.
type FactoryConfig ¶
type FactoryConfig struct {
// Stream factories - manager is oblivious to specific implementations
Factories []ManagedFactory
// Shared components
Stats *Stats
// Configuration
SuccessPeriods SuccessPeriods
StatsLogPeriod time.Duration
}
FactoryConfig holds all factories and configuration needed to run streams.
type ManagedFactory ¶
type ManagedFactory struct {
Factory StreamClientFactory
KeepalivePeriod time.Duration
}
ManagedFactory pairs a factory with its keepalive period.
type Stats ¶
type Stats struct {
// contains filtered or unexported fields
}
Stats tracks statistics for flows and resource mutations. All counters are safe for concurrent access.
func (*Stats) GetAndResetStats ¶
func (s *Stats) GetAndResetStats() (flowsReceived, flowsSent, resourceMutations, configuredObjectMutations uint64)
GetAndResetStats returns the current stats and resets all counters to zero.
func (*Stats) IncrementConfiguredObjectMutations ¶ added in v1.3.14
func (s *Stats) IncrementConfiguredObjectMutations()
IncrementConfiguredObjectMutations increments the count of configured object mutations.
func (*Stats) IncrementFlowsReceived ¶
func (s *Stats) IncrementFlowsReceived()
IncrementFlowsReceived increments the count of flows received from CNI collectors.
func (*Stats) IncrementFlowsSentToClusterSync ¶
func (s *Stats) IncrementFlowsSentToClusterSync()
IncrementFlowsSentToClusterSync increments the count of flows sent to k8sclustersync.
func (*Stats) IncrementResourceMutations ¶
func (s *Stats) IncrementResourceMutations()
IncrementResourceMutations increments the count of resource mutations.
type StreamClient ¶
type StreamClient interface {
// Run sends/receives messages over a gRPC stream until the stream is closed
// or the context is canceled. This is the main loop for the stream.
Run(ctx context.Context) error
// SendKeepalive sends a keepalive message over the gRPC stream.
SendKeepalive(ctx context.Context) error
// Close gracefully closes the stream.
Close() error
}
StreamClient abstracts all stream operations. Each stream type (config, logs, resources, flows) implements this interface. The stream manager only needs this interface - no knowledge of protobuf types.
type StreamClientFactory ¶
type StreamClientFactory interface {
// NewStreamClient creates a new StreamClient connected to the given gRPC connection.
// Each factory creates its own service client from the connection as needed.
NewStreamClient(ctx context.Context, grpcConn grpc.ClientConnInterface) (StreamClient, error)
// Name returns the name of the stream for logging purposes.
Name() string
}
StreamClientFactory creates StreamClients. Configuration is passed via factory fields, set by cmd/main.go. This enables dependency injection and testability.