root

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: Apache-2.0 Imports: 45 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Command = &cobra.Command{
	Use:   "wfx",
	Short: "wfx server",
	Long: `This API allows to create and execute reusable workflows for clients.
Each workflow is modeled as a state machine running in the storage, with tasks to be executed by clients.

Examples of tasks are installation of firmware or other types of commands issued to clients.`,
	PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
		f := cmd.Flags()
		knownOptions := make(map[string]bool, 64)
		f.VisitAll(func(flag *pflag.Flag) {
			knownOptions[flag.Name] = true
		})

		mergeFn := koanf.WithMergeFunc(func(src, dest map[string]any) error {

			for k, v := range src {
				if _, exists := knownOptions[k]; !exists {
					fmt.Fprintf(os.Stderr, "ERROR: Unknown config option '%s'", k)
				}
				dest[k] = v
			}
			return nil
		})

		cFiles, _ := f.GetStringSlice(configFlag)
		var fileProvider *file.File
		for _, fname := range cFiles {
			if _, err := os.Stat(fname); err == nil {
				fileProvider = file.Provider(fname)
				k.Write(func(k *koanf.Koanf) {
					if err := k.Load(fileProvider, yaml.Parser(), mergeFn); err != nil {
						fmt.Fprintf(os.Stderr, "ERROR: Failed to load config file '%s'", fname)
					}
				})

			}
		}

		envProvider := env.Provider("WFX_", ".", func(s string) string {

			return strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(s, "WFX_")), "_", "-")
		})
		k.Write(func(k *koanf.Koanf) {
			if err := k.Load(envProvider, nil, mergeFn); err != nil {
				fmt.Fprintln(os.Stderr, "ERROR: Could not load env variables")
			}
			if err := k.Load(posflag.Provider(f, ".", k), nil); err != nil {
				fmt.Fprintln(os.Stderr, "ERROR: Could not load CLI flags")
			}
		})

		// now that we have merged all config sources, set up logger
		var logLevel, logFormat string
		k.Read(func(k *koanf.Koanf) {
			logLevel = k.String(logLevelFlag)
			logFormat = k.String(logFormatFlag)
		})
		setupLogging(os.Stdout, logFormat, logLevel)

		if fileProvider != nil {
			err := fileProvider.Watch(func(event interface{}, err error) {
				if err == nil {
					k.Write(func(k *koanf.Koanf) {
						if err := k.Load(fileProvider, yaml.Parser(), mergeFn); err == nil {
							if err := reloadConfig(k); err != nil {
								log.Error().Err(err).Msg("Failed to reload config")
							}
						}
					})
				}
			})
			if err != nil {
				log.Error().Err(err).Msg("Failed to set up config file watcher")
			}
		}

		return nil
	},
	RunE: func(cmd *cobra.Command, args []string) error {
		var username string
		if u, err := user.Current(); err == nil {
			username = u.Username
		}

		log.Info().
			Str("version", metadata.Version).
			Str("date", metadata.Date).
			Str("commit", metadata.Commit).
			Str("user", username).
			Msg("Starting wfx")

		var name, options string
		k.Read(func(k *koanf.Koanf) {
			name = k.String(storageFlag)
			options = k.String(storageOptFlag)
		})
		log.Debug().Str("name", name).Str("options", options).Msg("Setting up persistence storage")
		if name != preferedStorage && options == defaultStorageOpts {
			options = ""
		}

		storage := persistence.GetStorage(name)
		if storage == nil {
			return fmt.Errorf("unknown storage %s", name)
		}
		var err error
		for i := 0; i < 300; i++ {
			log.Debug().Str("name", name).Msg("Initializing storage")
			err = storage.Initialize(context.Background(), options)
			if err == nil {
				log.Info().Str("name", name).Msg("Initialized storage")
				break
			}
			dur := time.Second
			log.Warn().
				Err(err).
				Str("storage", name).
				Msg("Failed to initialize persistent storage. Trying again in one second...")
			time.Sleep(dur)
		}
		if err != nil {
			return fault.Wrap(err)
		}
		defer storage.Shutdown()

		signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

		var schemes []string
		k.Read(func(k *koanf.Koanf) {
			schemes = k.Strings(schemeFlag)
		})
		serverCollections := make([]*serverCollection, 0, 3)
		chQuit := make(chan error)
		{
			collection, err := createNorthboundCollection(schemes, storage, chQuit)
			if err != nil {
				return fault.Wrap(err)
			}
			serverCollections = append(serverCollections, collection)
		}
		{
			collection, err := createSouthboundCollection(schemes, storage, chQuit)
			if err != nil {
				return fault.Wrap(err)
			}
			serverCollections = append(serverCollections, collection)
		}

		listeners, _ := activation.Listeners()
		serverCollections = append(serverCollections, adoptListeners(listeners, storage, chQuit)...)

		for _, collection := range serverCollections {
			for i := range collection.servers {

				name := collection.name
				srv := collection.servers[i]
				go func() {
					if err := launchServer(name, srv); err != nil {
						chQuit <- err
					}
				}()
			}
		}

		running := true
		for running {
			select {
			case <-signalChannel:
				running = false
			case <-chQuit:
				running = false
			}
		}

		events.ShutdownSubscribers()

		// create a context with a timeout to allow outstanding requests to complete
		var timeout time.Duration
		k.Read(func(k *koanf.Koanf) {
			timeout = k.Duration(gracefulTimeoutFlag)
		})
		ctx, cancel := context.WithTimeout(context.Background(), timeout)
		defer cancel()

		for _, collection := range serverCollections {
			collection.Shutdown(ctx)
		}
		return nil
	},
}

Functions

func LoadNorthboundPlugins added in v0.2.0

func LoadNorthboundPlugins(chan error) ([]middleware.IntermediateMW, error)

func LoadSouthboundPlugins added in v0.2.0

func LoadSouthboundPlugins(chan error) ([]middleware.IntermediateMW, error)

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL