server

package
v0.0.0-...-4108e51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: OSL-3.0 Imports: 46 Imported by: 0

Documentation

Overview

Package server provides a cloud-agnostic server implementation for CUDly. It supports both AWS Lambda and standard HTTP server modes.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateHTTPServer

func CreateHTTPServer(app *Application, port int) *http.Server

CreateHTTPServer builds the HTTP server with routes and timeouts configured, but does not start listening. This is useful for testing.

func StartHTTPServer

func StartHTTPServer(app *Application, port int) error

StartHTTPServer starts the HTTP server with graceful shutdown on SIGINT/SIGTERM. It blocks until the server exits cleanly. In container orchestrators (Cloud Run, Container Apps, Fargate) SIGTERM is the normal stop signal; without this wiring the process is killed before deferred app.Close() runs, leaving in-flight requests cut and the DB pool/advisory locks undrained (issue #1025).

func StartLambdaHandler

func StartLambdaHandler(app *Application)

StartLambdaHandler starts the AWS Lambda handler

Types

type AnalyticsCollectorInterface

type AnalyticsCollectorInterface interface {
	Collect(ctx context.Context) error
}

AnalyticsCollectorInterface aggregates current savings into a point-in-time snapshot row per (tenant, provider, service, region, commitment_type) bucket.

type AnalyticsConfig

type AnalyticsConfig struct {
	// Enabled gates the analytics_collect scheduled task. When false the task
	// returns a "disabled" status without touching the DB. Default true.
	Enabled bool
	// RetentionMonths is how many months of snapshot partitions to keep before
	// the retention job drops them. Default 24. Must be >= 1.
	RetentionMonths int
	// PartitionsAhead is how many future monthly partitions to keep provisioned
	// ahead of the current month so inserts never fall into the catch-all
	// default partition (M3). Default 3. Must be >= 1.
	PartitionsAhead int
}

AnalyticsConfig holds the savings-snapshot collector knobs, read from env at startup and validated at the boundary (see Validate).

func LoadAnalyticsConfig

func LoadAnalyticsConfig() AnalyticsConfig

LoadAnalyticsConfig reads the collector knobs from env, falling back to defaults for unset/blank values. Out-of-range or unparseable values are preserved as-is here so Validate can reject them with a clear message at startup (fail-fast at the boundary) rather than being silently clamped.

func (AnalyticsConfig) Validate

func (c AnalyticsConfig) Validate() error

Validate rejects out-of-range analytics knobs so a misconfiguration fails fast at startup instead of silently producing a broken retention/partition policy at the first scheduled run.

type AnalyticsStoreInterface

type AnalyticsStoreInterface interface {
	RefreshMaterializedViews(ctx context.Context) error
	// CreateFuturePartitions ensures partitions exist for the current month
	// plus monthsAhead months ahead (M3: partitions otherwise stop after the
	// seeded months and every insert falls into the catch-all default).
	CreateFuturePartitions(ctx context.Context, monthsAhead int) error
	// DropOldPartitions drops partitions older than retentionMonths (retention).
	DropOldPartitions(ctx context.Context, retentionMonths int) error
}

AnalyticsStoreInterface defines the methods required for analytics storage. Beyond the materialized-view refresh, the scheduled analytics task also keeps monthly partitions provisioned ahead of time and applies retention.

type Application

type Application struct {
	Config      config.StoreInterface
	API         *api.Handler
	Scheduler   SchedulerInterface
	Purchase    PurchaseManagerInterface
	Email       email.SenderInterface // Multi-cloud email sender (AWS SES, GCP SendGrid, Azure ACS)
	Auth        *auth.Service
	RateLimiter api.RateLimiterInterface // Distributed rate limiter (DB-backed for multi-instance)
	Analytics   AnalyticsStoreInterface  // Analytics store for savings data
	// AnalyticsCollector aggregates savings into snapshots on a schedule.
	// Nil until reinitializeAfterConnect wires it; the collect task no-ops
	// when nil so test builds without a DB stay quiet.
	AnalyticsCollector AnalyticsCollectorInterface
	Version            string
	DB                 *database.Connection // PostgreSQL database connection
	TaskLocker         TaskLocker           // Advisory lock for scheduled tasks (defaults to DB)
	// contains filtered or unexported fields
}

Application holds all components of the CUDly server

func NewApplication

func NewApplication(ctx context.Context, version string) (*Application, error)

NewApplication creates and initializes a new Application instance. version overrides the VERSION env var when non-empty, so cmd entrypoints can pass the ldflags-stamped value directly instead of round-tripping through os.Setenv / os.Getenv (04-N1). Pass "" to fall back to the env.

func NewApplicationFromDeps

func NewApplicationFromDeps(ctx context.Context, cfg ApplicationConfig, deps ExternalDeps) (*Application, error)

NewApplicationFromDeps creates an Application from pre-built configuration and dependencies. This is the testable constructor - all external I/O is done before calling this.

func (*Application) Close

func (app *Application) Close() error

Close gracefully shuts down the application

func (*Application) HandleLambdaEvent

func (app *Application) HandleLambdaEvent(ctx context.Context, rawEvent json.RawMessage) (any, error)

HandleLambdaEvent processes any Lambda event type

func (*Application) HandleSQSMessage

func (app *Application) HandleSQSMessage(ctx context.Context, body string) error

HandleSQSMessage processes an SQS message for async purchase processing

func (*Application) HandleScheduledTask

func (app *Application) HandleScheduledTask(ctx context.Context, taskType ScheduledTaskType) (any, error)

HandleScheduledTask processes a scheduled task by type. It acquires a PostgreSQL advisory lock to prevent concurrent execution of the same task.

type ApplicationConfig

type ApplicationConfig struct {
	Version                string
	NotificationDaysBefore int
	DefaultTerm            int
	DefaultPaymentOption   string
	DefaultCoverage        float64
	DefaultRampSchedule    string
	APIKeySecretARN        string
	EnableDashboard        bool
	DashboardBucket        string
	DashboardURL           string
	// IssuerURL is the canonical OIDC issuer URL published under
	// /.well-known/* and used as the iss claim in JWTs minted by the
	// KMS-backed signer. Falls back to DashboardURL. Set via the
	// CUDLY_ISSUER_URL env var; in the AWS Lambda deploy the Terraform
	// module wires this to the Function URL so the deployment is
	// self-contained without needing a frontend domain.
	IssuerURL         string
	CORSAllowedOrigin string
	// ScheduledTaskSecret is the shared secret checked on the /scheduled
	// endpoint. In production (Azure Container Apps, Lambda-with-KV) it is
	// resolved lazily from SCHEDULED_TASK_SECRET_NAME via the SecretResolver
	// in NewApplicationFromDeps, so the value never lives in a container
	// env var. In dev the plaintext SCHEDULED_TASK_SECRET env var is still
	// accepted as a fallback.
	ScheduledTaskSecret     string
	ScheduledTaskSecretName string
	IsLambda                bool

	// Analytics snapshot collector knobs. See analytics_collect.go for
	// defaults and boundary validation (LoadAnalyticsConfig).
	Analytics AnalyticsConfig
}

ApplicationConfig holds all env-based configuration for the application

func LoadApplicationConfig

func LoadApplicationConfig() ApplicationConfig

LoadApplicationConfig reads all configuration from environment variables

type CheckResult

type CheckResult struct {
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
}

CheckResult represents the result of a health check

type ExternalDeps

type ExternalDeps struct {
	EmailSender    email.SenderInterface
	ConfigStore    config.StoreInterface
	DBConfig       *database.Config
	SecretResolver secrets.Resolver
	STSClient      purchase.STSClient
}

ExternalDeps holds pre-built external dependencies that require infrastructure

type HealthStatus

type HealthStatus struct {
	Status    string                 `json:"status"`
	Version   string                 `json:"version"`
	Timestamp time.Time              `json:"timestamp"`
	Checks    map[string]CheckResult `json:"checks"`
}

HealthStatus represents the overall health of the application

type PurchaseManagerInterface

type PurchaseManagerInterface interface {
	ProcessScheduledPurchases(ctx context.Context) (*purchase.ProcessResult, error)
	SendUpcomingPurchaseNotifications(ctx context.Context) (*purchase.NotificationResult, error)
	ProcessMessage(ctx context.Context, body string) error
	ApproveExecution(ctx context.Context, execID, token, actor string) error
	ApproveAndExecute(ctx context.Context, execID, actor string) error
	CancelExecution(ctx context.Context, execID, token, actor string) error
	// ReapStuckExecutions sweeps purchase_executions stuck in
	// approved/running longer than reapAfter and flips them to "failed"
	// via the existing TransitionExecutionStatus CAS. Wired into the
	// "reap_stuck_purchases" scheduled task. See issue #678.
	ReapStuckExecutions(ctx context.Context, reapAfter time.Duration) (*purchase.ReapResult, error)
	// FireScheduledDelayedPurchases fires purchase_executions in status=scheduled
	// whose scheduled_execution_at is in the past (Gmail-style pre-fire delay,
	// issue #291 wave-2). Called on the "fire_scheduled_purchases" scheduler tick.
	FireScheduledDelayedPurchases(ctx context.Context) (*purchase.FireResult, error)
	// FinalizeInFlightRevocations sweeps purchase_history rows with
	// revocation_in_flight=true and retries MarkPurchaseRevoked for each. Handles
	// the partial-success case where Azure Return succeeded but the DB write
	// failed (issue #290 Finding #6). Called on the "finalize_revocations" tick.
	FinalizeInFlightRevocations(ctx context.Context) (*purchase.FinalizeResult, error)
}

PurchaseManagerInterface defines the methods required for the purchase manager component

type ScheduledEvent

type ScheduledEvent struct {
	Source     string          `json:"source"`
	DetailType string          `json:"detail-type"`
	Action     string          `json:"action"`
	Detail     json.RawMessage `json:"detail"`
}

ScheduledEvent represents a generic scheduled event

type ScheduledTaskType

type ScheduledTaskType string

ScheduledTaskType represents different types of scheduled tasks

const (
	TaskCollectRecommendations    ScheduledTaskType = "collect_recommendations"
	TaskProcessScheduledPurchases ScheduledTaskType = "process_scheduled_purchases"
	TaskSendNotifications         ScheduledTaskType = "send_notifications"
	TaskCleanupExpiredRecords     ScheduledTaskType = "cleanup"
	TaskRefreshAnalytics          ScheduledTaskType = "analytics_refresh"
	// TaskCollectAnalytics runs the savings-snapshot collector end to end:
	// ensure upcoming partitions, collect a snapshot across all tenants, apply
	// retention, and refresh the materialized views. Scheduled separately from
	// TaskRefreshAnalytics (the legacy refresh-only task) so the snapshot
	// ingestion cadence can differ from a pure view refresh. See issues
	// #1023 / #1033.
	TaskCollectAnalytics  ScheduledTaskType = "analytics_collect"
	TaskRIExchangeReshape ScheduledTaskType = "ri_exchange_reshape"
	// TaskReapStuckPurchases sweeps purchase_executions stuck in
	// approved/running longer than PURCHASE_APPROVED_REAP_AFTER and flips
	// them to "failed" via the existing TransitionExecutionStatus CAS.
	// Backstop for synchronous-executor crashes (Lambda timeout, OOM,
	// network hang) that leave rows orphaned in an in-flight state.
	// See internal/purchase/reaper.go + issue #678.
	TaskReapStuckPurchases ScheduledTaskType = "reap_stuck_purchases"
	// TaskFireScheduledPurchases fires purchase_executions in status=scheduled
	// whose scheduled_execution_at is in the past (Gmail-style pre-fire delay,
	// issue #291 wave-2). Wires the "fire_scheduled_purchases" event action
	// to purchase.Manager.FireScheduledDelayedPurchases.
	TaskFireScheduledPurchases ScheduledTaskType = "fire_scheduled_purchases"
	// TaskFinalizeRevocations sweeps purchase_history rows with
	// revocation_in_flight=true and retries MarkPurchaseRevoked for each.
	// These rows represent partial-success cases where the Azure Return API call
	// succeeded but the subsequent DB write failed. The sweep ensures the audit
	// record is eventually consistent without requiring the user to retry (which
	// would be rejected by Azure). See issue #290 Finding #6.
	TaskFinalizeRevocations ScheduledTaskType = "finalize_revocations"
)

func ParseScheduledEvent

func ParseScheduledEvent(rawEvent json.RawMessage) (ScheduledTaskType, error)

ParseScheduledEvent parses a scheduled event and returns the task type

type SchedulerInterface

type SchedulerInterface interface {
	CollectRecommendations(ctx context.Context) (*scheduler.CollectResult, error)
	ListRecommendations(ctx context.Context, filter config.RecommendationFilter) ([]config.RecommendationRecord, error)
	// GetRecommendationByID fetches a single rec by application-level id,
	// bypassing account-override filtering. hiddenBy is non-nil when the rec
	// exists but would be dropped by the override filter. Returns nil, nil,
	// nil when absent or fully suppressed.
	GetRecommendationByID(ctx context.Context, id string) (rec *config.RecommendationRecord, hiddenBy []string, err error)
}

SchedulerInterface defines the methods required for the scheduler component

type TaskLocker

type TaskLocker interface {
	TryAdvisoryLock(ctx context.Context, lockID int64) (bool, error)
	ReleaseAdvisoryLock(ctx context.Context, lockID int64)
}

TaskLocker abstracts advisory lock operations for scheduled task concurrency control.

Directories

Path Synopsis
Package scheduledauth provides authentication for the /api/scheduled/* endpoints invoked by Cloud Scheduler / Logic Apps.
Package scheduledauth provides authentication for the /api/scheduled/* endpoints invoked by Cloud Scheduler / Logic Apps.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL