Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Cmd = &cobra.Command{ Use: "serve", Short: "Starts the DRS Object API server", Run: func(cmd *cobra.Command, args []string) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) slog.SetDefault(logger) fatal := func(msg string, args ...any) { logger.Error(msg, args...) os.Exit(1) } cfg, err := config.LoadConfig(configFile) if err != nil { fatal("failed to load config", "err", err) } if cfg.Auth.Mode == config.AuthModeGen3 && cfg.Database.Postgres == nil && !isMockAuthEnabled() { fatal("auth.mode=gen3 requires postgres database") } // Init DB var database core.DatabaseInterface var errDb error if cfg.Database.Sqlite != nil { dbPath := cfg.Database.Sqlite.File if dbPath == "" { dbPath = "drs.db" } logger.Info("initializing sqlite database", "file", dbPath) database, errDb = sqlite.NewSqliteDB(dbPath) } else if cfg.Database.Postgres != nil { dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s", cfg.Database.Postgres.User, cfg.Database.Postgres.Password, cfg.Database.Postgres.Host, cfg.Database.Postgres.Port, cfg.Database.Postgres.Database, cfg.Database.Postgres.SSLMode, ) logger.Info("initializing postgres database", "host", cfg.Database.Postgres.Host, "database", cfg.Database.Postgres.Database) database, errDb = postgres.NewPostgresDB(dsn) } else { fatal("no database configuration provided") } if errDb != nil { fatal("failed to initialize database", "err", errDb) } if len(cfg.S3Credentials) > 0 { encryptionEnabled, encErr := core.CredentialEncryptionEnabled() if encErr != nil { fatal("invalid credential encryption configuration", "env", core.CredentialMasterKeyEnv, "err", encErr) } if !encryptionEnabled { fatal("s3 credential encryption key is required", "env", core.CredentialMasterKeyEnv) } logger.Info("loading configured s3 credentials", "count", len(cfg.S3Credentials)) for _, c := range cfg.S3Credentials { cred := &core.S3Credential{ Bucket: c.Bucket, Provider: c.Provider, Region: c.Region, AccessKey: c.AccessKey, SecretKey: c.SecretKey, Endpoint: c.Endpoint, } if err := database.SaveS3Credential(cmd.Context(), cred); err != nil { logger.Error("failed to save s3 credential", "bucket", c.Bucket, "err", err) } } } uM := urlmanager.NewManager(database, cfg.Signing) service := service.NewObjectsAPIService(database, uM) objectsController := drs.NewObjectsAPIController(service) serviceInfoController := drs.NewServiceInfoAPIController(service) uploadRequestController := drs.NewUploadRequestAPIController(service) router := mux.NewRouter().StrictSlash(true) router.HandleFunc(config.RouteHealthz, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }).Methods(http.MethodGet) api := router.PathPrefix("/").Subrouter() registerAPIRoutes(api, objectsController, serviceInfoController, uploadRequestController) slogLogger := logger authzMiddleware := middleware.NewAuthzMiddleware( slogLogger, cfg.Auth.Mode, cfg.Auth.Basic.Username, cfg.Auth.Basic.Password, ) requestIDMiddleware := middleware.NewRequestIDMiddleware(slogLogger) api.Use(requestIDMiddleware.Middleware) api.Use(authzMiddleware.Middleware) docs.RegisterSwaggerRoutes(api) coreapi.RegisterCoreRoutes(api, database) metrics.RegisterMetricsRoutes(api, database) internaldrs.RegisterInternalIndexRoutes(api, database) internaldrs.RegisterInternalDataRoutes(api, database, uM) logger.Info("internal drs compatibility routes enabled") lfs.RegisterLFSRoutes(api, database, uM, lfs.Options{ MaxBatchObjects: cfg.LFS.MaxBatchObjects, MaxBatchBodyBytes: cfg.LFS.MaxBatchBodyBytes, RequestLimitPerMinute: cfg.LFS.RequestLimitPerMinute, BandwidthLimitBytesPerMinute: cfg.LFS.BandwidthLimitBytesPerMinute, }) addr := fmt.Sprintf(":%d", cfg.Port) logger.Info("server starting", "addr", addr) server := &http.Server{ Addr: addr, Handler: router, ReadHeaderTimeout: 10 * time.Second, ReadTimeout: 30 * time.Second, WriteTimeout: 120 * time.Second, IdleTimeout: 120 * time.Second, } errCh := make(chan error, 1) go func() { if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } }() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) defer signal.Stop(sigCh) select { case err := <-errCh: fatal("server listen failed", "err", err) case sig := <-sigCh: logger.Info("shutdown signal received", "signal", sig.String()) case <-cmd.Context().Done(): logger.Info("shutdown requested by context cancellation") } shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { fatal("server shutdown failed", "err", err) } logger.Info("server shutdown complete") }, }
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.