firehose

package
v0.0.0-...-c2fe4dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Firehose = &cli.Command{
	Name: "firehose",
	Flags: []cli.Flag{
		&cli.BoolFlag{
			Name: "authed",
		},
		&cli.Int64Flag{
			Name:  "mf",
			Value: 0,
		},
		&cli.BoolFlag{
			Name: "likes",
		},
		&cli.BoolFlag{
			Name: "save",
		},
	},
	Action: func(cctx *cli.Context) error {
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT)
		defer stop()

		if !diskutil.FileExists(authFile) && cctx.Bool("authed") {

			if cctx.Args().Len() < 2 {
				return fmt.Errorf("please provide username and password")
			}

			sess, err := createSession(cctx)
			if err != nil {
				return err
			}

			err = diskutil.WriteStructToDisk(sess, authFile)
			if err != nil {
				return err
			}

		}

		arg := "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos"

		if cctx.String("pds-host") == "" {
			cctx.Set("pds-host", "https://bsky.social")
		}
		var xrpcc *xrpc.Client
		var err error
		if cctx.Bool("authed") {
			cctx.Set("auth", authFile)
			xrpcc, err = cliutil.GetXrpcClient(cctx, true)
			if err != nil {
				return err
			}
		}

		fmt.Println("dialing: ", arg)
		d := websocket.DefaultDialer
		con, _, err := d.Dial(arg, http.Header{})
		if err != nil {
			return fmt.Errorf("dial failure: %w", err)
		}

		fmt.Println("Stream Started", time.Now().Format(time.RFC3339))
		defer func() {
			fmt.Println("Stream Exited", time.Now().Format(time.RFC3339))
		}()

		go func() {
			<-ctx.Done()
			_ = con.Close()
		}()

		rscb := &events.RepoStreamCallbacks{
			RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {

				rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
				if err != nil {
					fmt.Println(err)
				} else {

					for _, op := range evt.Ops {
						ek := repomgr.EventKind(op.Action)
						switch ek {
						case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:

							rc, rec, err := rr.GetRecord(ctx, op.Path)
							if err != nil {
								e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
								log.Error(e)
								return nil
							}

							if lexutil.LexLink(rc) != *op.Cid {
								return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
							}

							banana := lexutil.LexiconTypeDecoder{
								Val: rec,
							}

							var pst = appbsky.FeedPost{}
							b, err := banana.MarshalJSON()
							if err != nil {
								fmt.Println(err)
							}

							err = json.Unmarshal(b, &pst)
							if err != nil {
								fmt.Println(err)
							}

							var userProfile *appbsky.ActorDefs_ProfileViewDetailed
							var replyUserProfile *appbsky.ActorDefs_ProfileViewDetailed
							if cctx.Bool("authed") {
								userProfile, err = appbsky.ActorGetProfile(context.TODO(), xrpcc, evt.Repo)
								if err != nil {
									fmt.Println(err)

									sess, err := refreshSession(cctx)
									if err == nil {
										err = diskutil.WriteStructToDisk(sess, authFile)
										if err != nil {
											return err
										}

										xrpcc, err = cliutil.GetXrpcClient(cctx, true)
										if err != nil {
											return err
										}

									}

								}
								if pst.Reply != nil {
									replyUserProfile, err = appbsky.ActorGetProfile(context.TODO(), xrpcc, strings.Split(pst.Reply.Parent.Uri, "/")[2])
									if err != nil {
										fmt.Println(err)
									}
								}

							}

							if pst.LexiconTypeID == "app.bsky.feed.post" {

								PrintPost(cctx, pst, userProfile, replyUserProfile, nil, op.Path)

							} else if pst.LexiconTypeID == "app.bsky.feed.like" && cctx.Bool("likes") {

								var like = appbsky.FeedLike{}
								err = json.Unmarshal(b, &like)
								if err != nil {
									fmt.Println(err)
								}

								likedDid := strings.Split(like.Subject.Uri, "/")[2]

								rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, likedDid, "", "")
								if err != nil {
									fmt.Println(err)
									continue
								}

								rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(rrb))
								if err != nil {
									fmt.Println(err)
									continue
								}

								_, rec, err := rr.GetRecord(ctx, like.Subject.Uri[strings.LastIndex(like.Subject.Uri[:strings.LastIndex(like.Subject.Uri, "/")], "/")+1:])
								if err != nil {
									log.Error(err)
									return nil
								}

								banana := lexutil.LexiconTypeDecoder{
									Val: rec,
								}

								var pst = appbsky.FeedPost{}
								b, err := banana.MarshalJSON()
								if err != nil {
									fmt.Println(err)
								}

								err = json.Unmarshal(b, &pst)
								if err != nil {
									fmt.Println(err)
								}

								likedUserProfile, err := appbsky.ActorGetProfile(context.TODO(), xrpcc, likedDid)
								if err != nil {
									fmt.Println(err)
								}

								PrintPost(cctx, pst, likedUserProfile, nil, userProfile, like.Subject.Uri[strings.LastIndex(like.Subject.Uri, "/")+1:])

							}

						case repomgr.EvtKindDeleteRecord:

						}
					}

				}

				return nil
			},
			RepoHandle: func(handle *comatproto.SyncSubscribeRepos_Handle) error {
				b, err := json.Marshal(handle)
				if err != nil {
					return err
				}
				fmt.Println("RepoHandle")
				fmt.Println(string(b))
				return nil
			},
			RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {

				b, err := json.Marshal(info)
				if err != nil {
					return err
				}
				fmt.Println("RepoInfo")
				fmt.Println(string(b))

				return nil
			},
			RepoMigrate: func(mig *comatproto.SyncSubscribeRepos_Migrate) error {
				b, err := json.Marshal(mig)
				if err != nil {
					return err
				}
				fmt.Println("RepoMigrate")
				fmt.Println(string(b))
				return nil
			},
			RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error {
				b, err := json.Marshal(tomb)
				if err != nil {
					return err
				}
				fmt.Println("RepoTombstone")
				fmt.Println(string(b))
				return nil
			},
			LabelLabels: func(labels *label.SubscribeLabels_Labels) error {
				b, err := json.Marshal(labels)
				if err != nil {
					return err
				}
				fmt.Println("LabelLabels")
				fmt.Println(string(b))
				return nil
			},
			LabelInfo: func(info *label.SubscribeLabels_Info) error {
				b, err := json.Marshal(info)
				if err != nil {
					return err
				}
				fmt.Println("LabelInfo")
				fmt.Println(string(b))
				return nil
			},

			Error: func(errf *events.ErrorFrame) error {
				return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
			},
		}

		seqScheduler := sequential.NewScheduler(con.RemoteAddr().String(), rscb.EventHandler)
		return events.HandleRepoStream(ctx, con, seqScheduler)
	},
}

Functions

func PrintPost

func PrintPost(cctx *cli.Context, pst appbsky.FeedPost, userProfile, replyUserProfile, likingUserProfile *appbsky.ActorDefs_ProfileViewDetailed, postPath string)

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL