server

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Cmd = &cobra.Command{
	Use:   "serve",
	Short: "Starts the DRS Object API server",
	Run: func(cmd *cobra.Command, args []string) {
		logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
		slog.SetDefault(logger)
		fatal := func(msg string, args ...any) {
			logger.Error(msg, args...)
			os.Exit(1)
		}

		cfg, err := config.LoadConfig(configFile)
		if err != nil {
			fatal("failed to load config", "err", err)
		}
		if cfg.Auth.Mode == config.AuthModeGen3 && cfg.Database.Postgres == nil && !isMockAuthEnabled() {
			fatal("auth.mode=gen3 requires postgres database")
		}

		// Init DB
		var database core.DatabaseInterface
		var errDb error

		if cfg.Database.Sqlite != nil {
			dbPath := cfg.Database.Sqlite.File
			if dbPath == "" {
				dbPath = "drs.db"
			}
			logger.Info("initializing sqlite database", "file", dbPath)
			database, errDb = sqlite.NewSqliteDB(dbPath)
		} else if cfg.Database.Postgres != nil {
			dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
				cfg.Database.Postgres.User,
				cfg.Database.Postgres.Password,
				cfg.Database.Postgres.Host,
				cfg.Database.Postgres.Port,
				cfg.Database.Postgres.Database,
				cfg.Database.Postgres.SSLMode,
			)
			logger.Info("initializing postgres database", "host", cfg.Database.Postgres.Host, "database", cfg.Database.Postgres.Database)
			database, errDb = postgres.NewPostgresDB(dsn)
		} else {
			fatal("no database configuration provided")
		}

		if errDb != nil {
			fatal("failed to initialize database", "err", errDb)
		}

		if len(cfg.S3Credentials) > 0 {
			encryptionEnabled, encErr := core.CredentialEncryptionEnabled()
			if encErr != nil {
				fatal("invalid credential encryption configuration", "env", core.CredentialMasterKeyEnv, "err", encErr)
			}
			if !encryptionEnabled {
				fatal("s3 credential encryption key is required", "env", core.CredentialMasterKeyEnv)
			}

			logger.Info("loading configured s3 credentials", "count", len(cfg.S3Credentials))

			for _, c := range cfg.S3Credentials {
				cred := &core.S3Credential{
					Bucket:    c.Bucket,
					Provider:  c.Provider,
					Region:    c.Region,
					AccessKey: c.AccessKey,
					SecretKey: c.SecretKey,
					Endpoint:  c.Endpoint,
				}
				if err := database.SaveS3Credential(cmd.Context(), cred); err != nil {
					logger.Error("failed to save s3 credential", "bucket", c.Bucket, "err", err)
				}
			}
		}

		uM := urlmanager.NewManager(database, cfg.Signing)

		service := service.NewObjectsAPIService(database, uM)

		objectsController := drs.NewObjectsAPIController(service)
		serviceInfoController := drs.NewServiceInfoAPIController(service)
		uploadRequestController := drs.NewUploadRequestAPIController(service)

		router := mux.NewRouter().StrictSlash(true)

		router.HandleFunc(config.RouteHealthz, func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusOK)
			w.Write([]byte("OK"))
		}).Methods(http.MethodGet)

		api := router.PathPrefix("/").Subrouter()
		registerAPIRoutes(api, objectsController, serviceInfoController, uploadRequestController)

		slogLogger := logger
		authzMiddleware := middleware.NewAuthzMiddleware(
			slogLogger,
			cfg.Auth.Mode,
			cfg.Auth.Basic.Username,
			cfg.Auth.Basic.Password,
		)
		requestIDMiddleware := middleware.NewRequestIDMiddleware(slogLogger)

		api.Use(requestIDMiddleware.Middleware)
		api.Use(authzMiddleware.Middleware)

		docs.RegisterSwaggerRoutes(api)
		coreapi.RegisterCoreRoutes(api, database)
		metrics.RegisterMetricsRoutes(api, database)

		internaldrs.RegisterInternalIndexRoutes(api, database)

		internaldrs.RegisterInternalDataRoutes(api, database, uM)
		logger.Info("internal drs compatibility routes enabled")

		lfs.RegisterLFSRoutes(api, database, uM, lfs.Options{
			MaxBatchObjects:              cfg.LFS.MaxBatchObjects,
			MaxBatchBodyBytes:            cfg.LFS.MaxBatchBodyBytes,
			RequestLimitPerMinute:        cfg.LFS.RequestLimitPerMinute,
			BandwidthLimitBytesPerMinute: cfg.LFS.BandwidthLimitBytesPerMinute,
		})

		addr := fmt.Sprintf(":%d", cfg.Port)
		logger.Info("server starting", "addr", addr)
		server := &http.Server{
			Addr:              addr,
			Handler:           router,
			ReadHeaderTimeout: 10 * time.Second,
			ReadTimeout:       30 * time.Second,
			WriteTimeout:      120 * time.Second,
			IdleTimeout:       120 * time.Second,
		}

		errCh := make(chan error, 1)
		go func() {
			if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
				errCh <- err
			}
		}()

		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
		defer signal.Stop(sigCh)

		select {
		case err := <-errCh:
			fatal("server listen failed", "err", err)
		case sig := <-sigCh:
			logger.Info("shutdown signal received", "signal", sig.String())
		case <-cmd.Context().Done():
			logger.Info("shutdown requested by context cancellation")
		}

		shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()
		if err := server.Shutdown(shutdownCtx); err != nil {
			fatal("server shutdown failed", "err", err)
		}
		logger.Info("server shutdown complete")
	},
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL