stream

package
v1.3.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package stream provides gRPC stream management for CloudSecure communication.

Note: LogStream interface is defined in logging/grpc_logger.go (not here) to avoid circular dependency. The logging package's BufferedGrpcWriteSyncer requires LogStream, and the stream package imports logging. Moving LogStream here would create: stream → logging → stream (circular import).

Index

Constants

View Source
const (
	// ConnectionRetryInterval is the initial interval between connection attempts.
	ConnectionRetryInterval = 5 * time.Second
	// ConnectionRetryJitter is the jitter percentage for connection retry timers.
	ConnectionRetryJitter = 0.20
)

Connection Retry Configuration.

View Source
const (
	// StreamInitialBackoff is the initial backoff duration for stream reconnection.
	StreamInitialBackoff = 1 * time.Second
	// StreamMaxBackoff is the maximum backoff duration for stream reconnection.
	StreamMaxBackoff = 1 * time.Minute
	// StreamMaxJitterPct is the maximum jitter percentage for stream backoff.
	StreamMaxJitterPct = 0.20
	// StreamSevereErrorThreshold is the number of errors before considering severe.
	StreamSevereErrorThreshold = 10
	// StreamExponentialFactor is the exponential factor for backoff calculation.
	StreamExponentialFactor = 2.0
)

Stream Backoff Configuration.

View Source
const (
	// ResetInitialBackoff is the initial backoff for reset operations.
	ResetInitialBackoff = 10 * time.Minute
	// ResetMaxBackoff is the maximum backoff for reset operations.
	ResetMaxBackoff = 10 * time.Second
	// ResetMaxJitterPct is the maximum jitter percentage for reset backoff.
	ResetMaxJitterPct = 0.10
)

Reset Backoff Configuration.

View Source
const (
	// ResourceProcessingTimeout is the maximum time allowed for resource processing
	// before the server is considered unhealthy.
	ResourceProcessingTimeout = 5 * time.Minute
)

Health Check Configuration.

Variables

View Source
var ErrStopRetries = errors.New("stop retries")

ErrStopRetries signals that retries should stop.

Functions

func ConnectStreams

func ConnectStreams(
	ctx context.Context,
	logger *zap.Logger,
	envMap Config,
	factoryConfig FactoryConfig,
)

ConnectStreams orchestrates all streams using the factory pattern.

func ManageStream

func ManageStream(
	ctx context.Context,
	logger *zap.Logger,
	conn grpc.ClientConnInterface,
	factory StreamClientFactory,
	keepalivePeriod time.Duration,
	successPeriods SuccessPeriods,
	done chan struct{},
)

ManageStream manages a stream with backoff and reconnection logic.

func NewAuthenticatedConnection

func NewAuthenticatedConnection(ctx context.Context, logger *zap.Logger, envMap Config) (*grpc.ClientConn, error)

NewAuthenticatedConnection gets a valid token and creates a connection to CloudSecure.

func ServerIsHealthy

func ServerIsHealthy() bool

ServerIsHealthy checks if a deadlock has occurred within the resource listing process.

func SetProcessingResources

func SetProcessingResources(processing bool)

SetProcessingResources updates the deadlock detector state.

func StartStatsLogger

func StartStatsLogger(ctx context.Context, logger *zap.Logger, stats *Stats, period time.Duration)

StartStatsLogger starts a goroutine that logs stream statistics at the configured period.

Types

type Action

type Action func() error

type Config

type Config struct {
	ClusterCreds           string
	ClusterName            string // Optional: cluster name for self-managed clusters
	HttpsProxy             string
	OnboardingClientID     string
	OnboardingClientSecret string
	OnboardingEndpoint     string
	PodNamespace           string
	StatsLogPeriod         time.Duration
	SuccessPeriods         SuccessPeriods
	TlsSkipVerify          bool
	TokenEndpoint          string
}

Config holds configuration for stream manager authentication.

type FactoryConfig

type FactoryConfig struct {
	// Stream factories - manager is oblivious to specific implementations
	Factories []ManagedFactory

	// Shared components
	Stats *Stats

	// Configuration
	SuccessPeriods SuccessPeriods
	StatsLogPeriod time.Duration
}

FactoryConfig holds all factories and configuration needed to run streams.

type ManagedFactory

type ManagedFactory struct {
	Factory         StreamClientFactory
	KeepalivePeriod time.Duration
}

ManagedFactory pairs a factory with its keepalive period.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats tracks statistics for flows and resource mutations. All counters are safe for concurrent access.

func NewStats

func NewStats() *Stats

NewStats creates a new Stats instance.

func (*Stats) GetAndResetStats

func (s *Stats) GetAndResetStats() (flowsReceived, flowsSent, resourceMutations, configuredObjectMutations uint64)

GetAndResetStats returns the current stats and resets all counters to zero.

func (*Stats) IncrementConfiguredObjectMutations added in v1.3.14

func (s *Stats) IncrementConfiguredObjectMutations()

IncrementConfiguredObjectMutations increments the count of configured object mutations.

func (*Stats) IncrementFlowsReceived

func (s *Stats) IncrementFlowsReceived()

IncrementFlowsReceived increments the count of flows received from CNI collectors.

func (*Stats) IncrementFlowsSentToClusterSync

func (s *Stats) IncrementFlowsSentToClusterSync()

IncrementFlowsSentToClusterSync increments the count of flows sent to k8sclustersync.

func (*Stats) IncrementResourceMutations

func (s *Stats) IncrementResourceMutations()

IncrementResourceMutations increments the count of resource mutations.

type StreamClient

type StreamClient interface {
	// Run sends/receives messages over a gRPC stream until the stream is closed
	// or the context is canceled. This is the main loop for the stream.
	Run(ctx context.Context) error

	// SendKeepalive sends a keepalive message over the gRPC stream.
	SendKeepalive(ctx context.Context) error

	// Close gracefully closes the stream.
	Close() error
}

StreamClient abstracts all stream operations. Each stream type (config, logs, resources, flows) implements this interface. The stream manager only needs this interface - no knowledge of protobuf types.

type StreamClientFactory

type StreamClientFactory interface {
	// NewStreamClient creates a new StreamClient connected to the given gRPC connection.
	// Each factory creates its own service client from the connection as needed.
	NewStreamClient(ctx context.Context, grpcConn grpc.ClientConnInterface) (StreamClient, error)

	// Name returns the name of the stream for logging purposes.
	Name() string
}

StreamClientFactory creates StreamClients. Configuration is passed via factory fields, set by cmd/main.go. This enables dependency injection and testability.

type SuccessPeriods

type SuccessPeriods struct {
	Auth    time.Duration
	Connect time.Duration
}

SuccessPeriods defines how long a stream must be active to be considered successful.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL