runner

package
v0.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Environment variable names for configuration
	EnvVarExporterType     = "EXPORTER_TYPE"
	EnvVarClusterName      = "CLUSTER_NAME"
	EnvVarNamespace        = "NAMESPACE"
	EnvVarPodLabelSelector = "LABEL_SELECTOR"
	EnvVarContainer        = "CONTAINER"
	EnvVarWorkers          = "WORKERS"
	EnvVarTimeout          = "TIMEOUT"
	EnvVarResync           = "RESYNC"
	EnvVarQPS              = "QPS"
	EnvVarBurst            = "BURST"
	EnvVarKubeconfig       = "KUBECONFIG"
	EnvVarLogLevel         = "LOG_LEVEL"
	EnvVarServerPort       = "SERVER_PORT"

	// Default values for configuration parameters
	DefaultExporterType     = "stdout" // Simple stdout output for easy debugging
	DefaultClusterName      = ""
	DefaultNamespace        = "gpu-operator"
	DefaultPodLabelSelector = "app=nvidia-device-plugin-daemonset"
	DefaultContainer        = "nvidia-device-plugin"
	DefaultForceRestart     = false            // Conservative default
	DefaultRestartOnFail    = true             // Fail-fast recovery pattern
	DefaultWorkers          = 16               // Balanced concurrency for most workloads
	DefaultTimeout          = 30 * time.Second // Reasonable for most commands
	DefaultResync           = 0                // Disable periodic resync by default (event-driven only)
	DefaultQPS              = 50               // Conservative API server rate limiting
	DefaultBurst            = 100              // Allow short bursts while maintaining average QPS
	DefaultKubeconfig       = ""               // Use standard kubeconfig resolution
	DefaultLogLevel         = "info"           // Balanced verbosity
	DefaultServerPort       = 8080             // Default port for metrics and health server
)

Variables

View Source
var (
	// Error definitions follow Go conventions and provide clear context for debugging
	ErrInvalidExporter   = fmt.Errorf("invalid exporter type")
	ErrNoClusterName     = fmt.Errorf("cluster name must be specified")
	ErrInvalidWorkers    = fmt.Errorf("workers must be > 0")
	ErrInvalidTimeout    = fmt.Errorf("timeout must be > 0")
	ErrInvalidQPS        = fmt.Errorf("qps must be > 0")
	ErrInvalidBurst      = fmt.Errorf("burst must be > 0")
	ErrNoLabelSelector   = fmt.Errorf("label selector must be specified")
	ErrNoContainer       = fmt.Errorf("container must be specified")
	ErrInvalidResync     = fmt.Errorf("resync period must be >= 0 (0 disables periodic resync)")
	ErrInvalidServerPort = fmt.Errorf("server port must be a valid integer between 1000 and 65535")
)

Functions

func ListEnvVars

func ListEnvVars() []string

func LookupEnv

func LookupEnv(name string) (string, bool)

func Run

func Run()

Run starts the pod execution controller with proper lifecycle management.

Types

type Command

type Command struct {
	ExporterType     string        // Exporter type (e.g., "stdout", "postgress", etc.)
	Cluster          string        // Cluster name for metrics labeling
	Namespace        string        // Kubernetes namespace to watch
	PodLabelSelector string        // Label selector for pod filtering
	Container        string        // Container name within the pod (empty = first container)
	Workers          int           // Number of concurrent workers
	Timeout          time.Duration // Per-command execution timeout
	Resync           time.Duration // Informer resync period (0 = no periodic resync)
	QPS              float32       // Kubernetes API client QPS limit
	Burst            int           // Kubernetes API client burst limit
	Kubeconfig       string        // Path to kubeconfig file
	LogLevel         string        // Logging verbosity level
	ServerPort       int           // Port for metrics and health server
	// contains filtered or unexported fields
}

Command encapsulates all configuration for the pod execution controller. The structure is designed to be immutable after validation, which prevents race conditions in concurrent environments and makes the behavior predictable.

func NewCommand

func NewCommand(opts ...Option) *Command

NewCommand creates a Command with production-ready defaults. The defaults are chosen based on common Kubernetes controller patterns and have been battle-tested in high-throughput environments.

func NewCommandFromEnvVars

func NewCommandFromEnvVars() *Command

NewCommandFromEnvVars creates a Command by reading configuration from environment variables.

func (*Command) Init

func (c *Command) Init(ctx context.Context, log *slog.Logger) error

func (*Command) Validate

func (c *Command) Validate() error

Validate performs comprehensive validation of the command configuration. This validation is crucial in distributed systems where invalid config can cause cascading failures or resource exhaustion.

type Exporter

type Exporter struct {
	Type   string         `json:"type"`
	Config ExporterConfig `json:"config"`
	// contains filtered or unexported fields
}

Exporter wraps an ExporterBackend with metadata and provides a high-level interface for exporting GPU serial number data in distributed systems.

func GetExporter

func GetExporter(ctx context.Context, log *slog.Logger, config ExporterConfig) (*Exporter, error)

GetExporter initializes an exporter based on the provided configuration.

func GetExporterSimple

func GetExporterSimple(ctx context.Context, log *slog.Logger, exporterType string) (*Exporter, error)

GetExporterSimple provides a backwards-compatible constructor for simple use cases.

func (*Exporter) Close

func (e *Exporter) Close(ctx context.Context) error

Close performs cleanup of the exporter's resources.

func (*Exporter) Export

func (e *Exporter) Export(ctx context.Context, log *slog.Logger, cluster string, pod *corev1.Pod, node string, serials []*gpu.Serials) error

Export handles exporting GPU serial numbers for a given pod using the configured exporter.

func (*Exporter) Health

func (e *Exporter) Health(ctx context.Context) error

Health performs a health check of the exporter's backend.

type ExporterBackend

type ExporterBackend interface {
	// Write exports the provided GPU serial number readings.
	Write(ctx context.Context, log *slog.Logger, records []*gpu.SerialNumberReading) error

	// Close performs cleanup of any resources held by the exporter.
	Close(ctx context.Context) error

	// Health performs a health check of the exporter's dependencies.
	Health(ctx context.Context) error
}

ExporterBackend defines the interface that all exporter implementations must satisfy.

type ExporterConfig

type ExporterConfig struct {
	Type       string        `json:"type" yaml:"type"`
	BatchSize  int           `json:"batch_size,omitempty" yaml:"batch_size,omitempty"`
	RetryCount int           `json:"retry_count,omitempty" yaml:"retry_count,omitempty"`
	Timeout    time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}

ExporterConfig holds configuration for initializing exporters. Individual exporters load their specific configuration from environment variables,

type Option

type Option func(*Command)

func WithBurst

func WithBurst(burst int) Option

func WithClusterName

func WithClusterName(cluster string) Option

func WithContainer

func WithContainer(container string) Option

func WithExporterType

func WithExporterType(exporter string) Option

func WithKubeconfig

func WithKubeconfig(kubeconfig string) Option

func WithLogLevel

func WithLogLevel(level string) Option

func WithNamespace

func WithNamespace(ns string) Option

func WithPodLabelSelector

func WithPodLabelSelector(labelSel string) Option

func WithQPS

func WithQPS(qps float32) Option

func WithResync

func WithResync(resync time.Duration) Option

func WithServerPort

func WithServerPort(port int) Option

func WithTimeout

func WithTimeout(timeout time.Duration) Option

func WithWorkers

func WithWorkers(workers int) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL