Documentation ¶
Index ¶
- type BackfillRepoStatus
- type Consumer
- func (c *Consumer) BackfillProcessor(ctx context.Context)
- func (c *Consumer) FanoutWrite(ctx context.Context, repo string, path string) error
- func (c *Consumer) FlushBackfillBuffer(ctx context.Context, bf *BackfillRepoStatus) int
- func (c *Consumer) HandleCreateRecord(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler) (*time.Time, error)
- func (c *Consumer) HandleDeleteRecord(ctx context.Context, repo string, path string) error
- func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error
- func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error
- func (c *Consumer) IntersectActivePosters(ctx context.Context, dids []string) ([]string, error)
- func (c *Consumer) MarkPosterActive(ctx context.Context, repo string, scoredAt time.Time) error
- func (c *Consumer) ProcessBackfill(ctx context.Context, repoDID string)
- func (c *Consumer) ReadCursor(ctx context.Context) error
- func (c *Consumer) TrimRecentPosts(ctx context.Context, maxAge time.Duration) error
- func (c *Consumer) WriteCursor(ctx context.Context) error
- type Delete
- type Progress
- type RecordJob
- type RecordResult
- type TagTracker
- type URI
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackfillRepoStatus ¶
type BackfillRepoStatus struct { RepoDid string Seq int64 State string DeleteBuffer []*Delete // contains filtered or unexported fields }
func (*BackfillRepoStatus) AddDelete ¶
func (b *BackfillRepoStatus) AddDelete(repo, path string)
func (*BackfillRepoStatus) SetState ¶
func (b *BackfillRepoStatus) SetState(state string)
type Consumer ¶
type Consumer struct { SocketURL string Progress *Progress Logger *zap.SugaredLogger RedisClient *redis.Client ProgressKey string Store *store.Store BackfillStatus *xsync.MapOf[string, *BackfillRepoStatus] SyncLimiter *rate.Limiter // contains filtered or unexported fields }
Consumer is the consumer of the firehose
func NewConsumer ¶
func NewConsumer( ctx context.Context, logger *zap.SugaredLogger, redisClient *redis.Client, redisPrefix string, store *store.Store, socketURL string, magicHeaderKey string, magicHeaderVal string, graphdRoot string, shardDBNodes []string, ) (*Consumer, error)
NewConsumer creates a new consumer
func (*Consumer) BackfillProcessor ¶
func (*Consumer) FanoutWrite ¶
func (*Consumer) FlushBackfillBuffer ¶
func (c *Consumer) FlushBackfillBuffer(ctx context.Context, bf *BackfillRepoStatus) int
func (*Consumer) HandleCreateRecord ¶
func (c *Consumer) HandleCreateRecord( ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, ) (*time.Time, error)
HandleCreateRecord handles a create record event from the firehose
func (*Consumer) HandleDeleteRecord ¶
HandleDeleteRecord handles a delete record event from the firehose
func (*Consumer) HandleRepoCommit ¶
func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error
HandleRepoCommit handles a repo commit event from the firehose and processes the records
func (*Consumer) HandleStreamEvent ¶
HandleStreamEvent handles a stream event from the firehose
func (*Consumer) IntersectActivePosters ¶
IntersectActivePosters returns the intersection of the active posters sorted set and the given set of posters
func (*Consumer) MarkPosterActive ¶
MarkPosterActive marks a poster as active in a redis sorted set
func (*Consumer) ProcessBackfill ¶
func (*Consumer) ReadCursor ¶
ReadCursor reads the cursor from redis
func (*Consumer) TrimRecentPosts ¶
TrimRecentPosts trims the recent posts from the recent_posts table and the active posters from redis
type Progress ¶
type Progress struct { LastSeq int64 `json:"last_seq"` LastSeqProcessedAt time.Time `json:"last_seq_processed_at"` // contains filtered or unexported fields }
Progress is the cursor for the consumer
type RecordResult ¶
type TagTracker ¶
type TagTracker struct {
RedisClient *redis.Client
}
TagTracker is a service for tracking the use of tags
func NewTagTracker ¶
func NewTagTracker(redisClient *redis.Client) (*TagTracker, error)
NewTagTracker creates a new TagTracker
func (*TagTracker) IncrementTagUseCounts ¶
func (t *TagTracker) IncrementTagUseCounts( ctx context.Context, actorDID string, tags []string, ) error
IncrementTagUseCounts increments the use count of a set of tags in a redis sorted set