core

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2022 License: Apache-2.0, MIT Imports: 64 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BargeAddCmd = &cli.Command{
	Name:        "add",
	Description: `'barge add <file>' is a command to add a file'`,
	Usage:       "barge add <file>",
	Flags: []cli.Flag{
		&cli.BoolFlag{
			Name: "progress",
		},
	},
	Action: func(cctx *cli.Context) error {
		r, err := OpenRepo()
		if err != nil {
			return err
		}

		progress := cctx.Bool("progress")

		var paths []string

		for _, f := range cctx.Args().Slice() {
			matches, err := filepath.Glob(f)
			if err != nil {
				return err
			}

			for _, m := range matches {

				st, err := os.Stat(m)
				if err != nil {
					return err
				}

				if st.IsDir() {
					sub, err := expandDirectory(m)
					if err != nil {
						return err
					}

					paths = append(paths, sub...)
				} else {
					paths = append(paths, m)
				}
			}
		}

		progcb := func(int64) {}
		incrTotal := func(int64) {}
		finish := func() {}

		if progress {
			bar := pb.New64(0)

			bar.Set(pb.Bytes, true)
			bar.SetTemplate(pb.Full)
			bar.Start()

			progcb = func(amt int64) {
				bar.Add64(amt)
			}

			var total int64
			var totlk sync.Mutex

			incrTotal = func(amt int64) {
				totlk.Lock()
				total += amt
				bar.SetTotal(total)
				totlk.Unlock()
			}

			finish = func() {
				bar.Finish()
			}

		}

		type addJob struct {
			Path  string
			Found []File
			Stat  os.FileInfo
		}

		type updateJob struct {
			Path  string
			Found []File
			Stat  os.FileInfo
			Cid   cid.Cid
		}

		tocheck := make(chan string, 1)
		tobuffer := make(chan *addJob, 128)
		toadd := make(chan *addJob)
		toupdate := make(chan updateJob, 128)

		go func() {
			defer close(tocheck)
			for _, f := range paths {
				tocheck <- f
			}
		}()

		go func() {
			defer close(tobuffer)
			for p := range tocheck {
				st, err := os.Stat(p)
				if err != nil {
					fmt.Println(err)
					return
				}

				incrTotal(st.Size())

				var found []File
				if err := r.DB.Find(&found, "path = ?", p).Error; err != nil {
					fmt.Println(err)
					return
				}

				if len(found) > 0 {
					existing := found[0]

					if st.ModTime().Equal(existing.Mtime) {

						continue
					}
				}

				tobuffer <- &addJob{
					Path:  p,
					Found: found,
					Stat:  st,
				}
			}
		}()

		go func() {
			defer close(toadd)
			var next *addJob
			var buffer []*addJob
			var out chan *addJob
			var inputDone bool

			for {
				select {
				case aj, ok := <-tobuffer:
					if !ok {
						inputDone = true
						if next == nil && len(buffer) == 0 {
							return
						}
						continue
					}
					if out == nil {
						next = aj
						out = toadd
					} else {
						buffer = append(buffer, aj)
					}
				case out <- next:
					if len(buffer) > 0 {
						next = buffer[0]
						buffer = buffer[1:]
					} else {
						out = nil
						next = nil
						if inputDone {
							return
						}
					}
				}
			}
		}()

		var wg sync.WaitGroup
		for i := 0; i < 10; i++ {
			wg.Add(1)

			go func() {
				defer wg.Done()
				for aj := range toadd {
					nd, _, err := filestoreAdd(r.Filestore, aj.Path, progcb)
					if err != nil {
						fmt.Println(err)
						return
					}

					toupdate <- updateJob{
						Path:  aj.Path,
						Found: aj.Found,
						Cid:   nd.Cid(),
						Stat:  aj.Stat,
					}
				}
			}()
		}

		go func() {
			wg.Wait()
			close(toupdate)
		}()

		var batchCreates []*File
		for uj := range toupdate {
			if len(uj.Found) > 0 {
				existing := uj.Found[0]
				if existing.Cid != uj.Cid.String() {
					if err := r.DB.Model(File{}).Where("id = ?", existing.ID).UpdateColumns(map[string]interface{}{
						"cid":   uj.Cid.String(),
						"mtime": uj.Stat.ModTime(),
					}).Error; err != nil {
						return err
					}
				}

				continue
			}

			abs, err := filepath.Abs(uj.Path)
			if err != nil {
				return err
			}

			rel, err := filepath.Rel(r.Dir, abs)
			if err != nil {
				return err
			}

			batchCreates = append(batchCreates, &File{
				Path:  rel,
				Cid:   uj.Cid.String(),
				Mtime: uj.Stat.ModTime(),
			})

			if len(batchCreates) > 200 {
				if err := r.DB.CreateInBatches(batchCreates, 100).Error; err != nil {
					return err
				}
				batchCreates = nil
			}
		}

		if err := r.DB.CreateInBatches(batchCreates, 100).Error; err != nil {
			return err
		}

		finish()

		return nil
	},
}
View Source
var BargeCheckCmd = &cli.Command{
	Name:        "check",
	Description: `'barge check' to check the state of the object'`,
	Usage:       "barge check <cid>",
	Action: func(cctx *cli.Context) error {
		r, err := OpenRepo()
		if err != nil {
			return err
		}

		for _, path := range cctx.Args().Slice() {
			var file File
			if err := r.DB.First(&file, "path = ?", path).Error; err != nil {
				return err
			}

			fcid, err := cid.Decode(file.Cid)
			if err != nil {
				return err
			}

			ctx := context.TODO()
			lres := filestore.Verify(ctx, r.Filestore, fcid)
			fmt.Println(lres.Status.String())
			fmt.Println(lres.ErrorMsg)
		}

		return nil
	},
}
View Source
var BargeShareCmd = &cli.Command{
	Name:        "share",
	Description: `'barge check' to share objects'`,
	Usage:       "barge share <cid>",
	Action: func(cctx *cli.Context) error {
		r, err := OpenRepo()
		if err != nil {
			return err
		}

		pc, err := setupBitswap(cctx.Context, r.Filestore)
		if err != nil {
			return err
		}

		h := pc.host

		for _, a := range h.Addrs() {
			fmt.Printf("%s/p2p/%s\n", a, h.ID())
		}

		select {}

	},
}
View Source
var BargeStatusCmd = &cli.Command{
	Name:        "status",
	Description: `'barge status' is a command to check the status of the file'`,
	Usage:       "barge status",
	Action: func(cctx *cli.Context) error {
		r, err := OpenRepo()
		if err != nil {
			return err
		}

		var allfiles []File
		if err := r.DB.Order("path asc").Find(&allfiles).Error; err != nil {
			return err
		}

		fmt.Println("Changes not yet staged:")

		var unpinned []File
		for _, f := range allfiles {
			ch, reason, err := maybeChanged(f)
			if err != nil {
				return err
			}

			var pins []Pin
			if err := r.DB.Find(&pins, "file = ?", f.ID).Error; err != nil {
				return err
			}

			if !ch {
				if len(pins) > 0 {
					pin := pins[0]

					if pin.Status == types.PinningStatusPinned {

						continue
					}
				}

				unpinned = append(unpinned, f)
				continue
			}

			fmt.Printf("\t%s: %s\n", reason, f.Path)
		}

		if len(unpinned) > 0 {
			fmt.Println()
			fmt.Println("Unpinned files:")
			for _, f := range unpinned {
				fmt.Printf("\t%s\n", f.Path)
			}
		}

		return nil
	},
}
View Source
var BargeSyncCmd = &cli.Command{
	Name:        "sync",
	Description: `'barge sync' is a command to synchronize the state of the objects in this barge instance'`,
	Usage:       "barge sync",
	Flags: []cli.Flag{
		&cli.BoolFlag{
			Name: "recover",
		},
		&cli.Int64Flag{
			Name: "new-pin-limit",
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := cctx.Context
		r, err := OpenRepo()
		if err != nil {
			return err
		}

		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		coluuid := r.Cfg.GetString("collection.uuid")
		if coluuid == "" {
			return fmt.Errorf("barge repo does not have a collection set")
		}

		var filespins []FileWithPin
		if err := r.DB.Model(File{}).Joins("left join pins on pins.file = files.id AND pins.cid = files.cid").Select("files.id as file_id, pins.id as pin_id, path, status, request_id, files.cid as cid").Scan(&filespins).Error; err != nil {
			return err
		}

		pc, err := setupBitswap(ctx, r.Filestore)
		if err != nil {
			return err
		}

		h := pc.host

		var addrs []string
		for _, a := range h.Addrs() {
			addrs = append(addrs, fmt.Sprintf("%s/p2p/%s", a, h.ID()))
		}

		var pinComplete []FileWithPin
		var needsNewPin []FileWithPin
		var inProgress []*Pin
		var checkProgress []FileWithPin
		for _, f := range filespins {
			if f.PinID == 0 {
				needsNewPin = append(needsNewPin, f)
				continue
			}

			if f.Status == types.PinningStatusPinned {

				continue
			}

			checkProgress = append(checkProgress, f)
		}

		batchSize := 500
		fmt.Printf("need to check progress of %d pins\n", len(checkProgress))
		for i := 0; i < len(checkProgress); i += batchSize {
			log.Printf("getting pin statuses: %d / %d\n", i, len(checkProgress))
			end := i + batchSize
			if end > len(checkProgress) {
				end = len(checkProgress)
			}

			var reqids []string
			for _, p := range checkProgress[i:end] {
				reqids = append(reqids, p.RequestID)
			}

			resp, err := c.PinStatuses(ctx, reqids)
			if err != nil {
				return fmt.Errorf("failed to recheck pin statuses: %w", err)
			}

			for _, fp := range checkProgress[i:end] {
				st, ok := resp[fp.RequestID]
				if !ok {
					return fmt.Errorf("did not get status back for requestid %s", fp.RequestID)
				}

				switch st.Status {
				case types.PinningStatusPinned:
					pinComplete = append(pinComplete, fp)
					if err := r.DB.Model(Pin{}).Where("id = ?", fp.PinID).UpdateColumn("status", st.Status).Error; err != nil {
						return err
					}
				case types.PinningStatusFailed:
					needsNewPin = append(needsNewPin, fp)
					if err := r.DB.Delete(Pin{ID: fp.PinID}).Error; err != nil {
						return err
					}
				default:

					inProgress = append(inProgress, &Pin{
						ID:        fp.PinID,
						File:      fp.FileID,
						Status:    fp.Status,
						RequestID: fp.RequestID,
					})
				}
			}
		}

		if cctx.Bool("recover") {
			fmt.Println("recovery requested, searching for pins on estuary not tracked locally...")
			for i, nnp := range needsNewPin {
				fmt.Printf("                                \r")
				fmt.Printf("[%d / %d]\r", i, len(needsNewPin))

				st, err := c.PinStatusByCid(ctx, []string{nnp.Cid})
				if err != nil {
					fmt.Println("failed to get pin status: ", err)
					continue
				}

				pin, ok := st[nnp.Cid]
				if !ok {
					continue
				}

				if pin.Status == types.PinningStatusFailed {

					continue
				}

				if err := r.DB.Create(&Pin{
					File:      nnp.FileID,
					Cid:       nnp.Cid,
					RequestID: pin.RequestID,
					Status:    pin.Status,
				}).Error; err != nil {
					return err
				}
			}

			return nil
		}

		fmt.Printf("need to make %d new pins\n", len(needsNewPin))
		if lim := cctx.Int64("new-pin-limit"); lim > 0 {
			if int64(len(needsNewPin)) > lim {
				needsNewPin = needsNewPin[:lim]
				fmt.Printf("only making %d for now...\n", lim)
			}
		}

		var dplk sync.Mutex
		var donePins int
		var wg sync.WaitGroup
		newpins := make([]*Pin, len(needsNewPin))
		errs := make([]error, len(needsNewPin))
		sema := make(chan struct{}, 20)
		var delegates []string
		for i := range needsNewPin {
			wg.Add(1)
			go func(ix int) {
				defer wg.Done()

				f := needsNewPin[ix]

				fcid, err := cid.Decode(f.Cid)
				if err != nil {
					errs[ix] = err
					return
				}

				sema <- struct{}{}
				defer func() {
					<-sema
				}()

				resp, err := c.PinAdd(ctx, fcid, filepath.Base(f.Path), addrs, map[string]interface{}{
					"coluuid": coluuid,
					"colpath": "/" + f.Path,
				})
				if err != nil {
					errs[ix] = err
					return
				}

				dplk.Lock()
				delegates = append(delegates, resp.Delegates...)
				donePins++
				fmt.Printf("                                                 \r")
				fmt.Printf("creating new pins %d/%d", donePins, len(needsNewPin))
				dplk.Unlock()

				p := &Pin{
					File:      f.FileID,
					Cid:       fcid.String(),
					RequestID: resp.RequestID,
					Status:    resp.Status,
				}

				newpins[ix] = p
			}(i)
		}
		wg.Wait()

		if err := connectToDelegates(ctx, h, delegates); err != nil {
			_, err := fmt.Fprintf(os.Stderr, "failed to connect to deletegates for new pin: %s\n", err)
			if err != nil {
				return err
			}
		}

		var tocreate []*Pin
		for _, p := range newpins {
			if p != nil {
				tocreate = append(tocreate, p)
				inProgress = append(inProgress, p)
			}
		}

		if len(tocreate) > 0 {
			if err := r.DB.CreateInBatches(tocreate, 100).Error; err != nil {
				return err
			}
		}

		for _, err := range errs {
			if err != nil {
				return err
			}
		}

		fmt.Println()
		fmt.Println("transferring data...")

		complete := make(map[string]bool)
		failed := make(map[string]bool)
		for range time.Tick(time.Second * 2) {

		loopstart:
			var tocheck []string
			for _, p := range inProgress {
				if complete[p.RequestID] || failed[p.RequestID] {
					continue
				}

				tocheck = append(tocheck, p.RequestID)

				if len(tocheck) >= 300 {
					break
				}
			}

			if len(inProgress)-(len(complete)+len(failed)) > batchSize*2 {
				for i := 0; i < 200; i++ {
					p := inProgress[rand.Intn(len(inProgress))]
					if complete[p.RequestID] || failed[p.RequestID] {
						continue
					}

					tocheck = append(tocheck, p.RequestID)
				}
			}

			statuses, err := c.PinStatuses(ctx, tocheck)
			if err != nil {
				return fmt.Errorf("failed to check pin statuses: %w", err)
			}

			var newdone int
			for _, req := range tocheck {
				status, ok := statuses[req]
				if !ok {
					fmt.Printf("didnt get expected pin status back in request: %s\n", req)
					continue
				}

				switch status.Status {
				case types.PinningStatusPinned:
					newdone++
					complete[req] = true
					if err := r.DB.Model(Pin{}).Where("request_id = ?", req).UpdateColumn("status", types.PinningStatusPinned).Error; err != nil {
						return err
					}
				case types.PinningStatusFailed:
					newdone++
					failed[req] = true
					if err := r.DB.Model(Pin{}).Where("request_id = ?", req).Delete(Pin{}).Error; err != nil {
						return err
					}
				default:
				}

				if err := connectToDelegates(ctx, h, status.Delegates); err != nil {
					fmt.Println("failed to connect to pin delegates: ", err)
				}
			}

			st := pc.bwc.GetBandwidthForProtocol("/ipfs/bitswap/1.2.0")
			fmt.Printf("pinned: %d, pinning: %d, failed: %d, xfer rate: %s/s (connections: %d)\n", len(complete), len(inProgress)-(len(complete)+len(failed)), len(failed), humanize.Bytes(uint64(st.RateOut)), len(h.Network().Conns()))

			if len(failed)+len(complete) >= len(inProgress) {
				break
			}

			if newdone > 100 {
				goto loopstart
			}
		}

		return nil

	},
}
View Source
var BsGetCmd = &cli.Command{
	Name:  "bsget",
	Usage: "bsget [flags] <cid> <peer multiaddress>",

	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:    "output",
			Aliases: []string{"o"},
			Usage:   "Specify file to which write the requested CIDs",
		},
	},
	Action: func(cctx *cli.Context) error {

		if cctx.Args().Len() < 2 {
			return fmt.Errorf("usage: bsget {CID} {PEER_MULTIADDRESS}")
		}

		root, err := cid.Decode(cctx.Args().Get(0))
		if err != nil {
			return err
		}

		maddr, err := multiaddr.NewMultiaddr(cctx.Args().Get(1))
		if err != nil {
			return err
		}
		ai, err := peer.AddrInfoFromP2pAddr(maddr)
		if err != nil {
			return err
		}

		ctx := context.Background()
		h, err := libp2p.New()
		if err != nil {
			return err
		}

		ds := sync.MutexWrap(datastore.NewMapDatastore())
		bstore := blockstore.NewBlockstore(ds)

		bsnet := bsnet.NewFromIpfsHost(h, &rhelp.Null{})
		bswap := bitswap.New(ctx, bsnet, bstore)

		bserv := blockservice.New(bstore, bswap)
		dag := merkledag.NewDAGService(bserv)

		if err := h.Connect(ctx, *ai); err != nil {
			return fmt.Errorf("failed to connect to target peer: %w", err)
		}

		bar := pb.StartNew(-1)
		bar.Set(pb.Bytes, true)

		cset := cid.NewSet()

		getLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
			node, err := dag.Get(ctx, c)
			if err != nil {
				return nil, err
			}
			bar.Add(len(node.RawData()))

			return node.Links(), nil

		}
		if err := merkledag.Walk(ctx, getLinks, root, cset.Visit, merkledag.Concurrency(2)); err != nil {
			return err
		}

		bar.Finish()

		fmt.Println("CIDs retrieved successfully")
		return nil
	},
}
View Source
var CollectionsCmd = &cli.Command{
	Name:        "collections",
	Description: `'barge collections' is a command to list all collections`,
	Usage:       "barge collections",
	Subcommands: []*cli.Command{
		CollectionsCreateCmd,
		CollectionsLsDirCmd,
	},
	Action: listCollections,
}
View Source
var CollectionsCreateCmd = &cli.Command{
	Name: "create",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:     "name",
			Required: true,
		},
		&cli.StringFlag{
			Name: "description",
		},
	},
	Action: func(cctx *cli.Context) error {
		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		col, err := c.CollectionsCreate(cctx.Context, cctx.String("name"), cctx.String("description"))
		if err != nil {
			return err
		}

		fmt.Println("new collection created")
		fmt.Println(col.Name)
		fmt.Println(col.UUID)

		return nil
	},
}
View Source
var CollectionsLsDirCmd = &cli.Command{
	Name:  "ls",
	Flags: []cli.Flag{},
	Action: func(cctx *cli.Context) error {
		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		if cctx.Args().Len() < 2 {
			return fmt.Errorf("must specify collection ID and path to list")
		}

		col := cctx.Args().Get(0)
		path := cctx.Args().Get(1)

		ents, err := c.CollectionsListDir(cctx.Context, col, path)
		if err != nil {
			return err
		}

		for _, e := range ents {
			if e.Dir {
				fmt.Println(e.Name + "/")
			} else {
				fmt.Println(e.Name)
			}
		}

		return nil
	},
}
View Source
var ConfigCmd = &cli.Command{
	Name:        "config",
	Description: `'barge config' is a command to set up the local barge configuration`,
	Usage:       "barge config <command>",
	Subcommands: []*cli.Command{
		ConfigSetCmd,
		ConfigShowCmd,
	},
}
View Source
var ConfigSetCmd = &cli.Command{
	Name:        "set",
	Description: `'barge config set <key> <value>' is a command to set up key value configuration'`,
	Action: func(cctx *cli.Context) error {
		if cctx.Args().Len() != 2 {
			return fmt.Errorf("must pass two arguments: key and value")
		}
		viper.Set(cctx.Args().Get(0), cctx.Args().Get(1))
		if err := viper.WriteConfig(); err != nil {
			return fmt.Errorf("failed to write config file: %w", err)
		}
		return nil
	},
}
View Source
var ConfigShowCmd = &cli.Command{
	Name:        "show",
	Description: `'barge config show' is a command to show the existing configuration'`,
	Action: func(cctx *cli.Context) error {
		var m map[string]interface{}
		if err := viper.Unmarshal(&m); err != nil {
			return err
		}

		b, err := json.MarshalIndent(m, "  ", "")
		if err != nil {
			return err
		}

		fmt.Println(string(b))
		return nil
	},
}
View Source
var InitCmd = &cli.Command{
	Name:        "init",
	Description: "initialize a barge repo in the current directory",
	Usage:       "barge init",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:  "collection",
			Usage: "specify an alternative name for this collection of data",
		},
		&cli.StringFlag{
			Name:  "description",
			Usage: "optionally set a description for this collection of data",
		},
		&cli.StringFlag{
			Name:  "dbdir",
			Usage: "set the location of the barge repo database",
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := cctx.Context

		inited, err := repoIsInitialized()
		if err != nil {
			return err
		}

		if inited {
			fmt.Println("repo already initialized")
			return nil
		}

		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		if err := os.Mkdir(".barge", 0775); err != nil {
			return err
		}

		cwd, err := os.Getwd()
		if err != nil {
			return err
		}

		v := viper.New()
		v.SetConfigName("config")
		v.SetConfigType("json")
		v.AddConfigPath(filepath.Join(cwd, ".barge"))

		if dbdir := cctx.String("dbdir"); dbdir != "" {
			parent := filepath.Dir(dbdir)
			if st, err := os.Stat(parent); err != nil {
				return err
			} else {
				if !st.IsDir() {
					return fmt.Errorf("invalid path for dbdir, %s is not a directory", parent)
				}

				if err := os.MkdirAll(dbdir, 0775); err != nil {
					return err
				}

			}
			v.Set("database.directory", dbdir)
		}

		if err := v.WriteConfigAs(filepath.Join(filepath.Join(cwd, ".barge", "config.json"))); err != nil {
			return err
		}

		r, err := OpenRepo()
		if err != nil {
			return err
		}

		colname := cctx.String("collection")
		desc := cctx.String("description")

		wd, err := os.Getwd()
		if err != nil {
			return err
		}

		if colname == "" {
			buf := make([]byte, 3)
			_, err := rand.Read(buf)
			if err != nil {
				return err
			}

			colname = fmt.Sprintf("%s-%x", filepath.Base(wd), buf)
		}
		if desc == "" {
			desc = wd
		}

		col, err := c.CollectionsCreate(ctx, colname, desc)
		if err != nil {
			return err
		}

		r.Cfg.Set("collection.uuid", col.UUID)
		r.Cfg.Set("collection.name", col.Name)

		return r.Cfg.WriteConfig()
	},
}
View Source
var LoginCmd = &cli.Command{
	Name:        "login",
	Description: "Login to the Estuary node",
	Usage:       "barge login <api key>",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:  "host",
			Value: "https://api.estuary.tech",
		},
	},
	Action: func(cctx *cli.Context) error {
		if !cctx.Args().Present() {
			return fmt.Errorf("must specify api token")
		}

		tok := cctx.Args().First()

		ec := &EstClient{
			Host: cctx.String("host"),
			Tok:  tok,
		}

		vresp, err := ec.Viewer(cctx.Context)
		if err != nil {
			return err
		}

		fmt.Println("logging in as user: ", vresp.Username)

		if len(vresp.Settings.UploadEndpoints) > 0 {
			sh := vresp.Settings.UploadEndpoints[0]
			u, err := url.Parse(sh)
			if err != nil {
				return err
			}

			u.Path = ""
			u.RawQuery = ""
			u.Fragment = ""

			fmt.Printf("selecting %s as our primary shuttle\n", u.String())

			viper.Set("estuary.primaryShuttle", u.String())
		}

		viper.Set("estuary.token", tok)
		viper.Set("estuary.host", ec.Host)

		return viper.WriteConfig()
	},
}
View Source
var PlumbCmd = &cli.Command{
	Name:        "plumb",
	Hidden:      true,
	Description: "low level plumbing commands",
	Usage:       "plumb <command> [<args>]",
	Subcommands: []*cli.Command{
		PlumbPutFileCmd,
		PlumbPutCarCmd,
		PlumbSplitAddFileCmd,
		PlumbPutDirCmd,
	},
}
View Source
var PlumbPutCarCmd = &cli.Command{
	Name: "put-car",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:  "name",
			Usage: "specify alternate name for file to be added with",
		},
	},
	Action: func(cctx *cli.Context) error {
		if !cctx.Args().Present() {
			return fmt.Errorf("must specify car file to upload")
		}

		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		c.DoProgress = true

		f := cctx.Args().First()
		fname := filepath.Base(f)
		if oname := cctx.String("name"); oname != "" {
			fname = oname
		}

		resp, err := c.AddCar(f, fname)
		if err != nil {
			return err
		}

		fmt.Println(resp.Cid)
		return nil
	},
}
View Source
var PlumbPutDirCmd = &cli.Command{
	Name: "put-dir",
	Action: func(cctx *cli.Context) error {
		ctx := cctx.Context
		client, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		ds := dsync.MutexWrap(datastore.NewMapDatastore())
		fsm := filestore.NewFileManager(ds, "/")
		bs := blockstore.NewBlockstore(ds)

		fsm.AllowFiles = true
		fstore := filestore.NewFilestore(bs, fsm)
		dserv := merkledag.NewDAGService(blockservice.New(fstore, nil))
		fname := cctx.Args().First()

		dnd, err := addDirectory(ctx, fstore, dserv, fname)

		if err != nil {
			return err
		}

		fmt.Println("imported directory: ", dnd.Cid())
		return doAddPin(ctx, fstore, client, dnd.Cid(), fname)
	},
}
View Source
var PlumbPutFileCmd = &cli.Command{
	Name:  "put-file",
	Usage: "put-file <file> [<name>]",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:  "name",
			Usage: "specify alternate name for file to be added with",
		},
		&cli.StringFlag{
			Name:  "password",
			Usage: "specify password to encrypt the file with a password",
		},
	},
	Action: func(cctx *cli.Context) error {
		if !cctx.Args().Present() {
			return fmt.Errorf("must specify filename to upload")
		}

		c, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		f := cctx.Args().First()
		fname := filepath.Base(f)
		if oname := cctx.String("name"); oname != "" {
			fname = oname
		}

		resp, err := c.AddFile(f, fname)
		if err != nil {
			return err
		}

		fmt.Println(resp.Cid)
		return nil
	},
}
View Source
var PlumbSplitAddFileCmd = &cli.Command{
	Name: "split-add",
	Flags: []cli.Flag{
		&cli.Uint64Flag{
			Name:  "chunk",
			Value: uint64(abi.PaddedPieceSize(16 << 30).Unpadded()),
		},
		&cli.BoolFlag{
			Name: "no-pin-only-split",
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx := cctx.Context
		client, err := LoadClient(cctx)
		if err != nil {
			return err
		}

		ds := dsync.MutexWrap(datastore.NewMapDatastore())
		fsm := filestore.NewFileManager(ds, "/")

		bs := blockstore.NewBlockstore(ds)

		fsm.AllowFiles = true
		fstore := filestore.NewFilestore(bs, fsm)
		cst := cbor.NewCborStore(fstore)

		fname := cctx.Args().First()

		progcb := func(int64) {}
		nd, _, err := filestoreAdd(fstore, fname, progcb)
		if err != nil {
			return err
		}

		fmt.Println("imported file: ", nd.Cid())

		dserv := merkledag.NewDAGService(blockservice.New(fstore, nil))
		builder := dagsplit.NewBuilder(dserv, cctx.Uint64("chunk"), 0)

		if err := builder.Pack(ctx, nd.Cid()); err != nil {
			return err
		}

		for i, box := range builder.Boxes() {
			cc, err := cst.Put(ctx, box)
			if err != nil {
				return err
			}

			tsize := 0

			cset := cid.NewSet()
			if err := merkledag.Walk(ctx, func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
				node, err := dserv.Get(ctx, c)
				if err != nil {
					return nil, err
				}

				tsize += len(node.RawData())

				return node.Links(), nil
			}, cc, cset.Visit); err != nil {
				return err
			}
			fmt.Printf("%d: %s %d\n", i, cc, tsize)
		}

		if cctx.Bool("no-pin-only-split") {
			return nil
		}

		pc, err := setupBitswap(ctx, fstore)
		if err != nil {
			return err
		}

		h := pc.host

		var addrs []string
		for _, a := range h.Addrs() {
			addrs = append(addrs, fmt.Sprintf("%s/p2p/%s", a, h.ID()))
		}
		fmt.Println("addresses: ", addrs)

		basename := filepath.Base(fname)

		var pins []string
		var cids []cid.Cid
		for i, box := range builder.Boxes() {
			cc, err := cst.Put(ctx, box)
			if err != nil {
				return err
			}

			cids = append(cids, cc)
			fmt.Println("box: ", i, cc)

			st, err := client.PinAdd(ctx, cc, fmt.Sprintf("%s-%d", basename, i), addrs, nil)
			if err != nil {
				return xerrors.Errorf("failed to pin box %d to estuary: %w", i, err)
			}

			if err := connectToDelegates(ctx, h, st.Delegates); err != nil {
				fmt.Println("failed to connect to pin delegates: ", err)
			}

			pins = append(pins, st.RequestID)
		}

		for range time.Tick(time.Second * 2) {
			var pinning, queued, pinned, failed int
			for _, p := range pins {
				status, err := client.PinStatus(ctx, p)
				if err != nil {
					fmt.Println("error getting pin status: ", err)
					continue
				}

				switch status.Status {
				case types.PinningStatusPinned:
					pinned++
				case types.PinningStatusFailed:
					failed++
				case types.PinningStatusPinning:
					pinning++
				case types.PinningStatusQueued:
					queued++
				}

				if err := connectToDelegates(ctx, h, status.Delegates); err != nil {
					fmt.Println("failed to connect to pin delegates: ", err)
				}
			}

			fmt.Printf("pinned: %d, pinning: %d, queued: %d, failed: %d (num conns: %d)\n", pinned, pinning, queued, failed, len(h.Network().Conns()))
			if failed+pinned >= len(pins) {
				break
			}
		}

		fmt.Println("finished pinning: ", nd.Cid())

		return nil
	},
}
View Source
var UiWebCmd = &cli.Command{
	Name:        "web",
	Description: "barge web is a command to start the web UI",
	Usage:       "barge web",
	Action: func(context *cli.Context) error {

		os.Mkdir("upload", 0775)

		fs := http.FileServer(http.Dir("./web"))

		http.Handle("/", fs)
		http.HandleFunc("/api/v0/plumb/file", func(w http.ResponseWriter, r *http.Request) {
			enableCors(&w)
			var contentResponse *util.ContentAddResponse
			var jsonResponse []byte
			var err error

			if r.Method == "POST" {

				file, handler, err := r.FormFile("file")
				if err != nil {
					return
				}

				defer file.Close()
				defer func() {

					os.Remove("upload/" + handler.Filename)
				}()

				fmt.Printf("Uploaded File: %+v\n", handler.Filename)
				fmt.Printf("File Size: %+v\n", handler.Size)
				fmt.Printf("MIME Header: %+v\n", handler.Header)

				fileBytes, err := ioutil.ReadAll(file)
				ioutil.WriteFile("./upload/"+handler.Filename, fileBytes, 0644)

				fmt.Println(r.FormValue("fpath"))
				contentResponse, err = PlumbAddFile(context, "./upload/"+handler.Filename, handler.Filename)
				if err != nil {
					log.Println(err)
					w.WriteHeader(http.StatusInternalServerError)

					jsonResponse, _ = json.Marshal(map[string]string{
						"status": fmt.Sprint(http.StatusBadRequest),
						"error":  err.Error(),
					})
					_, err = io.WriteString(w, string(jsonResponse))
				}
			}

			contentResponseJson, err := json.Marshal(contentResponse)
			if err != nil {
				log.Println(err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/json")
			_, err = io.WriteString(w, string(contentResponseJson))
			if err != nil {
				return
			}
		})
		http.HandleFunc("/api/v0/plumb/files", func(w http.ResponseWriter, r *http.Request) {

		})
		http.HandleFunc("/api/v0/plumb/car", func(w http.ResponseWriter, r *http.Request) {
			var contentResponse *util.ContentAddResponse
			var jsonResponse []byte
			var err error

			if r.Method == "POST" {

				file, handler, err := r.FormFile("file")
				if err != nil {
					return
				}

				defer file.Close()
				defer func() {

					os.Remove("upload/" + handler.Filename)
				}()

				fmt.Printf("Uploaded File: %+v\n", handler.Filename)
				fmt.Printf("File Size: %+v\n", handler.Size)
				fmt.Printf("MIME Header: %+v\n", handler.Header)

				fileBytes, err := ioutil.ReadAll(file)
				ioutil.WriteFile("./upload/"+handler.Filename, fileBytes, 0644)

				fmt.Println(r.FormValue("fpath"))
				contentResponse, err = PlumbAddCar(context, "./upload/"+handler.Filename, handler.Filename)
				if err != nil {
					log.Println(err)
					w.WriteHeader(http.StatusInternalServerError)

					jsonResponse, _ = json.Marshal(map[string]string{
						"status": fmt.Sprint(http.StatusBadRequest),
						"error":  err.Error(),
					})
					_, err = io.WriteString(w, string(jsonResponse))
				}
			}

			contentResponseJson, err := json.Marshal(contentResponse)
			if err != nil {
				log.Println(err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/json")
			_, err = io.WriteString(w, string(contentResponseJson))
			if err != nil {
				return
			}
		})
		http.HandleFunc("/api/v0/get-files", func(w http.ResponseWriter, r *http.Request) {
			if r.Method == "POST" {

			}
			fmt.Println("get files")

		})
		log.Print("Listening on :3000...")
		err := http.ListenAndServe(":3000", nil)
		if err != nil {
			log.Fatal(err)
		}
		return err
	},
}

Functions

func LoadConfig added in v0.1.1

func LoadConfig() error

func PlumbAddCar added in v0.1.0

func PlumbAddCar(ctx *cli.Context, fpath string, fname string) (*util.ContentAddResponse, error)

func PlumbAddFile added in v0.1.0

func PlumbAddFile(ctx *cli.Context, fpath string, fname string) (*util.ContentAddResponse, error)

Types

type Config

type Config struct {
	Estuary EstuaryConfig `json:"estuary"`
}

type EstClient

type EstClient struct {
	Host       string
	Shuttle    string
	Tok        string
	DoProgress bool
	LogTimings bool
}

func LoadClient

func LoadClient(cctx *cli.Context) (*EstClient, error)

func (*EstClient) AddCar

func (c *EstClient) AddCar(fpath, _ string) (*util.ContentAddResponse, error)

func (*EstClient) AddFile

func (c *EstClient) AddFile(fpath, filename string) (*util.ContentAddResponse, error)

func (*EstClient) CollectionsCreate

func (c *EstClient) CollectionsCreate(_ context.Context, name, desc string) (*dbmgr.Collection, error)

func (*EstClient) CollectionsList

func (c *EstClient) CollectionsList(context.Context) ([]*dbmgr.Collection, error)

func (*EstClient) CollectionsListDir

func (c *EstClient) CollectionsListDir(_ context.Context, coluuid, path string) ([]collectionListResponse, error)

func (*EstClient) PinAdd

func (c *EstClient) PinAdd(_ context.Context, root cid.Cid, name string, origins []string, meta map[string]interface{}) (*types.IpfsPinStatusResponse, error)

func (*EstClient) PinStatus

func (c *EstClient) PinStatus(_ context.Context, reqid string) (*types.IpfsPinStatusResponse, error)

func (*EstClient) PinStatusByCid

func (c *EstClient) PinStatusByCid(_ context.Context, cids []string) (map[string]*types.IpfsPinStatusResponse, error)

func (*EstClient) PinStatuses

func (c *EstClient) PinStatuses(_ context.Context, reqids []string) (map[string]*types.IpfsPinStatusResponse, error)

func (*EstClient) Viewer

type EstuaryConfig

type EstuaryConfig struct {
	Token          string `json:"token"`
	Host           string `json:"host"`
	PrimaryShuttle string `json:"primaryShuttle"`
}

type File

type File struct {
	ID        uint `gorm:"primarykey"`
	CreatedAt time.Time
	Path      string `gorm:"index"`
	Cid       string
	Mtime     time.Time
}

type FileWithPin

type FileWithPin struct {
	FileID uint
	PinID  uint

	Cid       string
	Path      string
	Status    types.PinningStatus
	RequestID string
}

type FilestoreFile

type FilestoreFile struct {
	*os.File
	// contains filtered or unexported fields
}

func (*FilestoreFile) AbsPath

func (ff *FilestoreFile) AbsPath() string

func (*FilestoreFile) Read

func (ff *FilestoreFile) Read(b []byte) (int, error)

func (*FilestoreFile) Size

func (ff *FilestoreFile) Size() (int64, error)

func (*FilestoreFile) Stat

func (ff *FilestoreFile) Stat() os.FileInfo

type Pin

type Pin struct {
	ID        uint `gorm:"primarykey"`
	CreatedAt time.Time
	File      uint `gorm:"index"`
	Cid       string
	RequestID string `gorm:"index"`
	Status    types.PinningStatus
}

type PinClient

type PinClient struct {
	// contains filtered or unexported fields
}

type Repo

type Repo struct {
	DB        *gorm.DB
	Filestore *filestore.Filestore
	Dir       string

	Cfg *viper.Viper
	// contains filtered or unexported fields
}

func OpenRepo

func OpenRepo() (*Repo, error)

func (*Repo) Close

func (r *Repo) Close() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL