Documentation
¶
Overview ¶
Package server provides a cloud-agnostic server implementation for CUDly. It supports both AWS Lambda and standard HTTP server modes.
Index ¶
- func CreateHTTPServer(app *Application, port int) *http.Server
- func StartHTTPServer(app *Application, port int) error
- func StartLambdaHandler(app *Application)
- type AnalyticsCollectorInterface
- type AnalyticsConfig
- type AnalyticsStoreInterface
- type Application
- func (app *Application) Close() error
- func (app *Application) HandleLambdaEvent(ctx context.Context, rawEvent json.RawMessage) (any, error)
- func (app *Application) HandleSQSMessage(ctx context.Context, body string) error
- func (app *Application) HandleScheduledTask(ctx context.Context, taskType ScheduledTaskType) (any, error)
- type ApplicationConfig
- type CheckResult
- type ExternalDeps
- type HealthStatus
- type PurchaseManagerInterface
- type ScheduledEvent
- type ScheduledTaskType
- type SchedulerInterface
- type TaskLocker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateHTTPServer ¶
func CreateHTTPServer(app *Application, port int) *http.Server
CreateHTTPServer builds the HTTP server with routes and timeouts configured, but does not start listening. This is useful for testing.
func StartHTTPServer ¶
func StartHTTPServer(app *Application, port int) error
StartHTTPServer starts the HTTP server with graceful shutdown on SIGINT/SIGTERM. It blocks until the server exits cleanly. In container orchestrators (Cloud Run, Container Apps, Fargate) SIGTERM is the normal stop signal; without this wiring the process is killed before deferred app.Close() runs, leaving in-flight requests cut and the DB pool/advisory locks undrained (issue #1025).
func StartLambdaHandler ¶
func StartLambdaHandler(app *Application)
StartLambdaHandler starts the AWS Lambda handler
Types ¶
type AnalyticsCollectorInterface ¶
AnalyticsCollectorInterface aggregates current savings into a point-in-time snapshot row per (tenant, provider, service, region, commitment_type) bucket.
type AnalyticsConfig ¶
type AnalyticsConfig struct {
// Enabled gates the analytics_collect scheduled task. When false the task
// returns a "disabled" status without touching the DB. Default true.
Enabled bool
// RetentionMonths is how many months of snapshot partitions to keep before
// the retention job drops them. Default 24. Must be >= 1.
RetentionMonths int
// PartitionsAhead is how many future monthly partitions to keep provisioned
// ahead of the current month so inserts never fall into the catch-all
// default partition (M3). Default 3. Must be >= 1.
PartitionsAhead int
}
AnalyticsConfig holds the savings-snapshot collector knobs, read from env at startup and validated at the boundary (see Validate).
func LoadAnalyticsConfig ¶
func LoadAnalyticsConfig() AnalyticsConfig
LoadAnalyticsConfig reads the collector knobs from env, falling back to defaults for unset/blank values. Out-of-range or unparseable values are preserved as-is here so Validate can reject them with a clear message at startup (fail-fast at the boundary) rather than being silently clamped.
func (AnalyticsConfig) Validate ¶
func (c AnalyticsConfig) Validate() error
Validate rejects out-of-range analytics knobs so a misconfiguration fails fast at startup instead of silently producing a broken retention/partition policy at the first scheduled run.
type AnalyticsStoreInterface ¶
type AnalyticsStoreInterface interface {
RefreshMaterializedViews(ctx context.Context) error
// CreateFuturePartitions ensures partitions exist for the current month
// plus monthsAhead months ahead (M3: partitions otherwise stop after the
// seeded months and every insert falls into the catch-all default).
CreateFuturePartitions(ctx context.Context, monthsAhead int) error
// DropOldPartitions drops partitions older than retentionMonths (retention).
DropOldPartitions(ctx context.Context, retentionMonths int) error
}
AnalyticsStoreInterface defines the methods required for analytics storage. Beyond the materialized-view refresh, the scheduled analytics task also keeps monthly partitions provisioned ahead of time and applies retention.
type Application ¶
type Application struct {
Config config.StoreInterface
API *api.Handler
Scheduler SchedulerInterface
Purchase PurchaseManagerInterface
Email email.SenderInterface // Multi-cloud email sender (AWS SES, GCP SendGrid, Azure ACS)
Auth *auth.Service
RateLimiter api.RateLimiterInterface // Distributed rate limiter (DB-backed for multi-instance)
Analytics AnalyticsStoreInterface // Analytics store for savings data
// AnalyticsCollector aggregates savings into snapshots on a schedule.
// Nil until reinitializeAfterConnect wires it; the collect task no-ops
// when nil so test builds without a DB stay quiet.
AnalyticsCollector AnalyticsCollectorInterface
Version string
DB *database.Connection // PostgreSQL database connection
TaskLocker TaskLocker // Advisory lock for scheduled tasks (defaults to DB)
// contains filtered or unexported fields
}
Application holds all components of the CUDly server
func NewApplication ¶
func NewApplication(ctx context.Context, version string) (*Application, error)
NewApplication creates and initializes a new Application instance. version overrides the VERSION env var when non-empty, so cmd entrypoints can pass the ldflags-stamped value directly instead of round-tripping through os.Setenv / os.Getenv (04-N1). Pass "" to fall back to the env.
func NewApplicationFromDeps ¶
func NewApplicationFromDeps(ctx context.Context, cfg ApplicationConfig, deps ExternalDeps) (*Application, error)
NewApplicationFromDeps creates an Application from pre-built configuration and dependencies. This is the testable constructor - all external I/O is done before calling this.
func (*Application) Close ¶
func (app *Application) Close() error
Close gracefully shuts down the application
func (*Application) HandleLambdaEvent ¶
func (app *Application) HandleLambdaEvent(ctx context.Context, rawEvent json.RawMessage) (any, error)
HandleLambdaEvent processes any Lambda event type
func (*Application) HandleSQSMessage ¶
func (app *Application) HandleSQSMessage(ctx context.Context, body string) error
HandleSQSMessage processes an SQS message for async purchase processing
func (*Application) HandleScheduledTask ¶
func (app *Application) HandleScheduledTask(ctx context.Context, taskType ScheduledTaskType) (any, error)
HandleScheduledTask processes a scheduled task by type. It acquires a PostgreSQL advisory lock to prevent concurrent execution of the same task.
type ApplicationConfig ¶
type ApplicationConfig struct {
Version string
NotificationDaysBefore int
DefaultTerm int
DefaultPaymentOption string
DefaultCoverage float64
DefaultRampSchedule string
APIKeySecretARN string
EnableDashboard bool
DashboardBucket string
DashboardURL string
// IssuerURL is the canonical OIDC issuer URL published under
// /.well-known/* and used as the iss claim in JWTs minted by the
// KMS-backed signer. Falls back to DashboardURL. Set via the
// CUDLY_ISSUER_URL env var; in the AWS Lambda deploy the Terraform
// module wires this to the Function URL so the deployment is
// self-contained without needing a frontend domain.
IssuerURL string
CORSAllowedOrigin string
// ScheduledTaskSecret is the shared secret checked on the /scheduled
// endpoint. In production (Azure Container Apps, Lambda-with-KV) it is
// resolved lazily from SCHEDULED_TASK_SECRET_NAME via the SecretResolver
// in NewApplicationFromDeps, so the value never lives in a container
// env var. In dev the plaintext SCHEDULED_TASK_SECRET env var is still
// accepted as a fallback.
ScheduledTaskSecret string
ScheduledTaskSecretName string
IsLambda bool
// Analytics snapshot collector knobs. See analytics_collect.go for
// defaults and boundary validation (LoadAnalyticsConfig).
Analytics AnalyticsConfig
}
ApplicationConfig holds all env-based configuration for the application
func LoadApplicationConfig ¶
func LoadApplicationConfig() ApplicationConfig
LoadApplicationConfig reads all configuration from environment variables
type CheckResult ¶
CheckResult represents the result of a health check
type ExternalDeps ¶
type ExternalDeps struct {
EmailSender email.SenderInterface
ConfigStore config.StoreInterface
DBConfig *database.Config
SecretResolver secrets.Resolver
STSClient purchase.STSClient
}
ExternalDeps holds pre-built external dependencies that require infrastructure
type HealthStatus ¶
type HealthStatus struct {
Status string `json:"status"`
Version string `json:"version"`
Timestamp time.Time `json:"timestamp"`
Checks map[string]CheckResult `json:"checks"`
}
HealthStatus represents the overall health of the application
type PurchaseManagerInterface ¶
type PurchaseManagerInterface interface {
ProcessScheduledPurchases(ctx context.Context) (*purchase.ProcessResult, error)
SendUpcomingPurchaseNotifications(ctx context.Context) (*purchase.NotificationResult, error)
ProcessMessage(ctx context.Context, body string) error
ApproveExecution(ctx context.Context, execID, token, actor string) error
ApproveAndExecute(ctx context.Context, execID, actor string) error
CancelExecution(ctx context.Context, execID, token, actor string) error
// ReapStuckExecutions sweeps purchase_executions stuck in
// approved/running longer than reapAfter and flips them to "failed"
// via the existing TransitionExecutionStatus CAS. Wired into the
// "reap_stuck_purchases" scheduled task. See issue #678.
ReapStuckExecutions(ctx context.Context, reapAfter time.Duration) (*purchase.ReapResult, error)
// FireScheduledDelayedPurchases fires purchase_executions in status=scheduled
// whose scheduled_execution_at is in the past (Gmail-style pre-fire delay,
// issue #291 wave-2). Called on the "fire_scheduled_purchases" scheduler tick.
FireScheduledDelayedPurchases(ctx context.Context) (*purchase.FireResult, error)
// FinalizeInFlightRevocations sweeps purchase_history rows with
// revocation_in_flight=true and retries MarkPurchaseRevoked for each. Handles
// the partial-success case where Azure Return succeeded but the DB write
// failed (issue #290 Finding #6). Called on the "finalize_revocations" tick.
FinalizeInFlightRevocations(ctx context.Context) (*purchase.FinalizeResult, error)
}
PurchaseManagerInterface defines the methods required for the purchase manager component
type ScheduledEvent ¶
type ScheduledEvent struct {
Source string `json:"source"`
DetailType string `json:"detail-type"`
Action string `json:"action"`
Detail json.RawMessage `json:"detail"`
}
ScheduledEvent represents a generic scheduled event
type ScheduledTaskType ¶
type ScheduledTaskType string
ScheduledTaskType represents different types of scheduled tasks
const ( TaskCollectRecommendations ScheduledTaskType = "collect_recommendations" TaskProcessScheduledPurchases ScheduledTaskType = "process_scheduled_purchases" TaskSendNotifications ScheduledTaskType = "send_notifications" TaskCleanupExpiredRecords ScheduledTaskType = "cleanup" TaskRefreshAnalytics ScheduledTaskType = "analytics_refresh" // TaskCollectAnalytics runs the savings-snapshot collector end to end: // ensure upcoming partitions, collect a snapshot across all tenants, apply // retention, and refresh the materialized views. Scheduled separately from // TaskRefreshAnalytics (the legacy refresh-only task) so the snapshot // ingestion cadence can differ from a pure view refresh. See issues // #1023 / #1033. TaskCollectAnalytics ScheduledTaskType = "analytics_collect" TaskRIExchangeReshape ScheduledTaskType = "ri_exchange_reshape" // TaskReapStuckPurchases sweeps purchase_executions stuck in // approved/running longer than PURCHASE_APPROVED_REAP_AFTER and flips // them to "failed" via the existing TransitionExecutionStatus CAS. // Backstop for synchronous-executor crashes (Lambda timeout, OOM, // network hang) that leave rows orphaned in an in-flight state. // See internal/purchase/reaper.go + issue #678. TaskReapStuckPurchases ScheduledTaskType = "reap_stuck_purchases" // TaskFireScheduledPurchases fires purchase_executions in status=scheduled // whose scheduled_execution_at is in the past (Gmail-style pre-fire delay, // issue #291 wave-2). Wires the "fire_scheduled_purchases" event action // to purchase.Manager.FireScheduledDelayedPurchases. TaskFireScheduledPurchases ScheduledTaskType = "fire_scheduled_purchases" // TaskFinalizeRevocations sweeps purchase_history rows with // revocation_in_flight=true and retries MarkPurchaseRevoked for each. // These rows represent partial-success cases where the Azure Return API call // succeeded but the subsequent DB write failed. The sweep ensures the audit // record is eventually consistent without requiring the user to retry (which // would be rejected by Azure). See issue #290 Finding #6. TaskFinalizeRevocations ScheduledTaskType = "finalize_revocations" )
func ParseScheduledEvent ¶
func ParseScheduledEvent(rawEvent json.RawMessage) (ScheduledTaskType, error)
ParseScheduledEvent parses a scheduled event and returns the task type
type SchedulerInterface ¶
type SchedulerInterface interface {
CollectRecommendations(ctx context.Context) (*scheduler.CollectResult, error)
ListRecommendations(ctx context.Context, filter config.RecommendationFilter) ([]config.RecommendationRecord, error)
// GetRecommendationByID fetches a single rec by application-level id,
// bypassing account-override filtering. hiddenBy is non-nil when the rec
// exists but would be dropped by the override filter. Returns nil, nil,
// nil when absent or fully suppressed.
GetRecommendationByID(ctx context.Context, id string) (rec *config.RecommendationRecord, hiddenBy []string, err error)
}
SchedulerInterface defines the methods required for the scheduler component
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package scheduledauth provides authentication for the /api/scheduled/* endpoints invoked by Cloud Scheduler / Logic Apps.
|
Package scheduledauth provides authentication for the /api/scheduled/* endpoints invoked by Cloud Scheduler / Logic Apps. |