cli

package
v0.0.0-...-66d4bb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2020 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var WarpPipeCmd = &cobra.Command{
	Use:   "warp-pipe",
	Short: "Run a warp-pipe",
	Long:  `Run a warp-pipe and stream changes from a Postgres database.`,
	RunE: func(cmd *cobra.Command, _ []string) error {
		config, err := parseConfig()
		if err != nil {
			return err
		}

		listener, err := initListener(config)
		if err != nil {
			return err
		}

		wp := warppipe.NewWarpPipe(
			listener,
			warppipe.IgnoreTables(config.IgnoreTables),
			warppipe.WhitelistTables(config.WhitelistTables),
			warppipe.LogLevel(config.LogLevel),
		)
		if err := wp.Open(&config.Database); err != nil {
			log.Fatal(err)
		}

		ctx, cancel := context.WithCancel(context.Background())
		changes, errors := wp.ListenForChanges(ctx)
		go func() {
			for {
				select {
				case change := <-changes:
					b, err := json.Marshal(change)
					if err != nil {
						log.Error(err)
					}
					fmt.Println(string(b))
				case err := <-errors:
					log.Error(err)
				}
			}
		}()

		shutdownCh := make(chan os.Signal, 1)
		signal.Notify(shutdownCh, os.Interrupt, syscall.SIGTERM)
		for {
			<-shutdownCh
			cancel()
			if err := wp.Close(); err != nil {
				return err
			}
			return nil
		}
	},
}

WarpPipeCmd is the root command.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL