consumer

package
v0.0.0-...-1eb5c16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackfillRepoStatus

type BackfillRepoStatus struct {
	RepoDid      string
	Seq          int64
	State        string
	DeleteBuffer []*Delete
	// contains filtered or unexported fields
}

func (*BackfillRepoStatus) AddDelete

func (b *BackfillRepoStatus) AddDelete(repo, path string)

func (*BackfillRepoStatus) SetState

func (b *BackfillRepoStatus) SetState(state string)

type Consumer

type Consumer struct {
	SocketURL string
	Progress  *Progress
	Logger    *zap.SugaredLogger

	RedisClient *redis.Client
	ProgressKey string

	Store *store.Store

	BackfillStatus *xsync.MapOf[string, *BackfillRepoStatus]
	SyncLimiter    *rate.Limiter
	// contains filtered or unexported fields
}

Consumer is the consumer of the firehose

func NewConsumer

func NewConsumer(
	ctx context.Context,
	logger *zap.SugaredLogger,
	redisClient *redis.Client,
	redisPrefix string,
	store *store.Store,
	socketURL string,
	magicHeaderKey string,
	magicHeaderVal string,
	graphdRoot string,
	shardDBNodes []string,
) (*Consumer, error)

NewConsumer creates a new consumer

func (*Consumer) BackfillProcessor

func (c *Consumer) BackfillProcessor(ctx context.Context)

func (*Consumer) FanoutWrite

func (c *Consumer) FanoutWrite(
	ctx context.Context,
	repo string,
	path string,
) error

func (*Consumer) FlushBackfillBuffer

func (c *Consumer) FlushBackfillBuffer(ctx context.Context, bf *BackfillRepoStatus) int

func (*Consumer) HandleCreateRecord

func (c *Consumer) HandleCreateRecord(
	ctx context.Context,
	repo string,
	path string,
	rec typegen.CBORMarshaler,
) (*time.Time, error)

HandleCreateRecord handles a create record event from the firehose

func (*Consumer) HandleDeleteRecord

func (c *Consumer) HandleDeleteRecord(
	ctx context.Context,
	repo string,
	path string,
) error

HandleDeleteRecord handles a delete record event from the firehose

func (*Consumer) HandleRepoCommit

func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error

HandleRepoCommit handles a repo commit event from the firehose and processes the records

func (*Consumer) HandleStreamEvent

func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error

HandleStreamEvent handles a stream event from the firehose

func (*Consumer) IntersectActivePosters

func (c *Consumer) IntersectActivePosters(
	ctx context.Context,
	dids []string,
) ([]string, error)

IntersectActivePosters returns the intersection of the active posters sorted set and the given set of posters

func (*Consumer) MarkPosterActive

func (c *Consumer) MarkPosterActive(
	ctx context.Context,
	repo string,
	scoredAt time.Time,
) error

MarkPosterActive marks a poster as active in a redis sorted set

func (*Consumer) ProcessBackfill

func (c *Consumer) ProcessBackfill(ctx context.Context, repoDID string)

func (*Consumer) ReadCursor

func (c *Consumer) ReadCursor(ctx context.Context) error

ReadCursor reads the cursor from redis

func (*Consumer) TrimRecentPosts

func (c *Consumer) TrimRecentPosts(ctx context.Context, maxAge time.Duration) error

TrimRecentPosts trims the recent posts from the recent_posts table and the active posters from redis

func (*Consumer) WriteCursor

func (c *Consumer) WriteCursor(ctx context.Context) error

WriteCursor writes the cursor to redis

type Delete

type Delete struct {
	// contains filtered or unexported fields
}

type Progress

type Progress struct {
	LastSeq            int64     `json:"last_seq"`
	LastSeqProcessedAt time.Time `json:"last_seq_processed_at"`
	// contains filtered or unexported fields
}

Progress is the cursor for the consumer

func (*Progress) Get

func (p *Progress) Get() (int64, time.Time)

func (*Progress) Update

func (p *Progress) Update(seq int64, processedAt time.Time)

type RecordJob

type RecordJob struct {
	RecordPath string
	NodeCid    cid.Cid
}

type RecordResult

type RecordResult struct {
	RecordPath string
	Error      error
}

type TagTracker

type TagTracker struct {
	RedisClient *redis.Client
}

TagTracker is a service for tracking the use of tags

func NewTagTracker

func NewTagTracker(redisClient *redis.Client) (*TagTracker, error)

NewTagTracker creates a new TagTracker

func (*TagTracker) IncrementTagUseCounts

func (t *TagTracker) IncrementTagUseCounts(
	ctx context.Context,
	actorDID string,
	tags []string,
) error

IncrementTagUseCounts increments the use count of a set of tags in a redis sorted set

type URI

type URI struct {
	Did        string
	RKey       string
	Collection string
}

func GetURI

func GetURI(uri string) (*URI, error)

URI: at://{did}/{namespace}/{rkey}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL