Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Firehose = &cli.Command{ Name: "firehose", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "authed", }, &cli.Int64Flag{ Name: "mf", Value: 0, }, &cli.BoolFlag{ Name: "likes", }, &cli.BoolFlag{ Name: "save", }, }, Action: func(cctx *cli.Context) error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT) defer stop() if !diskutil.FileExists(authFile) && cctx.Bool("authed") { if cctx.Args().Len() < 2 { return fmt.Errorf("please provide username and password") } sess, err := createSession(cctx) if err != nil { return err } err = diskutil.WriteStructToDisk(sess, authFile) if err != nil { return err } } arg := "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos" if cctx.String("pds-host") == "" { cctx.Set("pds-host", "https://bsky.social") } var xrpcc *xrpc.Client var err error if cctx.Bool("authed") { cctx.Set("auth", authFile) xrpcc, err = cliutil.GetXrpcClient(cctx, true) if err != nil { return err } } fmt.Println("dialing: ", arg) d := websocket.DefaultDialer con, _, err := d.Dial(arg, http.Header{}) if err != nil { return fmt.Errorf("dial failure: %w", err) } fmt.Println("Stream Started", time.Now().Format(time.RFC3339)) defer func() { fmt.Println("Stream Exited", time.Now().Format(time.RFC3339)) }() go func() { <-ctx.Done() _ = con.Close() }() rscb := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) if err != nil { fmt.Println(err) } else { for _, op := range evt.Ops { ek := repomgr.EventKind(op.Action) switch ek { case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: rc, rec, err := rr.GetRecord(ctx, op.Path) if err != nil { e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) log.Error(e) return nil } if lexutil.LexLink(rc) != *op.Cid { return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) } banana := lexutil.LexiconTypeDecoder{ Val: rec, } var pst = appbsky.FeedPost{} b, err := banana.MarshalJSON() if err != nil { fmt.Println(err) } err = json.Unmarshal(b, &pst) if err != nil { fmt.Println(err) } var userProfile *appbsky.ActorDefs_ProfileViewDetailed var replyUserProfile *appbsky.ActorDefs_ProfileViewDetailed if cctx.Bool("authed") { userProfile, err = appbsky.ActorGetProfile(context.TODO(), xrpcc, evt.Repo) if err != nil { fmt.Println(err) sess, err := refreshSession(cctx) if err == nil { err = diskutil.WriteStructToDisk(sess, authFile) if err != nil { return err } xrpcc, err = cliutil.GetXrpcClient(cctx, true) if err != nil { return err } } } if pst.Reply != nil { replyUserProfile, err = appbsky.ActorGetProfile(context.TODO(), xrpcc, strings.Split(pst.Reply.Parent.Uri, "/")[2]) if err != nil { fmt.Println(err) } } } if pst.LexiconTypeID == "app.bsky.feed.post" { PrintPost(cctx, pst, userProfile, replyUserProfile, nil, op.Path) } else if pst.LexiconTypeID == "app.bsky.feed.like" && cctx.Bool("likes") { var like = appbsky.FeedLike{} err = json.Unmarshal(b, &like) if err != nil { fmt.Println(err) } likedDid := strings.Split(like.Subject.Uri, "/")[2] rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, likedDid, "", "") if err != nil { fmt.Println(err) continue } rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(rrb)) if err != nil { fmt.Println(err) continue } _, rec, err := rr.GetRecord(ctx, like.Subject.Uri[strings.LastIndex(like.Subject.Uri[:strings.LastIndex(like.Subject.Uri, "/")], "/")+1:]) if err != nil { log.Error(err) return nil } banana := lexutil.LexiconTypeDecoder{ Val: rec, } var pst = appbsky.FeedPost{} b, err := banana.MarshalJSON() if err != nil { fmt.Println(err) } err = json.Unmarshal(b, &pst) if err != nil { fmt.Println(err) } likedUserProfile, err := appbsky.ActorGetProfile(context.TODO(), xrpcc, likedDid) if err != nil { fmt.Println(err) } PrintPost(cctx, pst, likedUserProfile, nil, userProfile, like.Subject.Uri[strings.LastIndex(like.Subject.Uri, "/")+1:]) } case repomgr.EvtKindDeleteRecord: } } } return nil }, RepoHandle: func(handle *comatproto.SyncSubscribeRepos_Handle) error { b, err := json.Marshal(handle) if err != nil { return err } fmt.Println("RepoHandle") fmt.Println(string(b)) return nil }, RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error { b, err := json.Marshal(info) if err != nil { return err } fmt.Println("RepoInfo") fmt.Println(string(b)) return nil }, RepoMigrate: func(mig *comatproto.SyncSubscribeRepos_Migrate) error { b, err := json.Marshal(mig) if err != nil { return err } fmt.Println("RepoMigrate") fmt.Println(string(b)) return nil }, RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error { b, err := json.Marshal(tomb) if err != nil { return err } fmt.Println("RepoTombstone") fmt.Println(string(b)) return nil }, LabelLabels: func(labels *label.SubscribeLabels_Labels) error { b, err := json.Marshal(labels) if err != nil { return err } fmt.Println("LabelLabels") fmt.Println(string(b)) return nil }, LabelInfo: func(info *label.SubscribeLabels_Info) error { b, err := json.Marshal(info) if err != nil { return err } fmt.Println("LabelInfo") fmt.Println(string(b)) return nil }, Error: func(errf *events.ErrorFrame) error { return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) }, } seqScheduler := sequential.NewScheduler(con.RemoteAddr().String(), rscb.EventHandler) return events.HandleRepoStream(ctx, con, seqScheduler) }, }
Functions ¶
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.