crawler

package
v0.0.0-...-73d2201 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FinalizeTableAfterBulkLoad

func FinalizeTableAfterBulkLoad(ctx context.Context, conn driver.Conn, logger *slog.Logger) error

FinalizeTableAfterBulkLoad re-adds secondary indexes and resumes merges.

func PreloadPLCState

func PreloadPLCState(ctx context.Context, conn driver.Conn, logger *slog.Logger) (map[string]string, error)

PreloadPLCState loads the full DID→PDS mapping from plc_did_state into memory. Uses argMax instead of FINAL for better performance on large ReplacingMergeTree tables. PDS URL strings are interned to reduce memory since most accounts share a few PDS hosts.

func PrepareTableForBulkLoad

func PrepareTableForBulkLoad(ctx context.Context, conn driver.Conn, truncate bool, logger *slog.Logger) error

PrepareTableForBulkLoad stops merges and drops secondary indexes on crawl_records to maximize insert throughput during bulk loading.

func RunPrepare

func RunPrepare(ctx context.Context, config PrepareConfig) error

RunPrepare discovers PDSs from the relay, enumerates repos, verifies PDS claims, and writes the results to the crawl_repos table in ClickHouse.

func SetupBulkClickHouse

func SetupBulkClickHouse(address, username, password string) (driver.Conn, error)

SetupBulkClickHouse creates a ClickHouse connection optimized for bulk inserts.

Types

type CHWriter

type CHWriter struct {
	// contains filtered or unexported fields
}

CHWriter receives CrawledRepos and batch-inserts them into ClickHouse.

func NewCHWriter

func NewCHWriter(conn driver.Conn, input <-chan *repoarchive.CrawledRepo, logger *slog.Logger) *CHWriter

NewCHWriter creates a CHWriter that reads from the given channel.

func (*CHWriter) Run

func (w *CHWriter) Run(ctx context.Context)

Run starts the writer loop. It blocks until the input channel is closed or ctx is cancelled.

func (*CHWriter) Sent

func (w *CHWriter) Sent() int64

Sent returns the number of records successfully sent.

func (*CHWriter) Wait

func (w *CHWriter) Wait() error

Wait blocks until the writer has finished.

type Config

type Config struct {
	OutputDir     string
	Workers       int
	DefaultPDSRPS float64
	SegmentSize   int64
	ZstdLevel     int
	MaxRetries    int
	MaxRepoSizeMB int
	PageSize      int
	SkipPDS       []string

	// Direct mode: write directly to ClickHouse instead of .rca archives.
	Direct       bool
	DirectCHConn driver.Conn // Bulk-optimized connection for direct inserts.

	ClickHouseConn driver.Conn
	Logger         *slog.Logger
}

Config holds all configuration for a crawl run.

type CrawlTask

type CrawlTask struct {
	DID string
	PDS string
}

CrawlTask is a unit of work dispatched to a worker.

type Crawler

type Crawler struct {
	// contains filtered or unexported fields
}

Crawler orchestrates the full network crawl.

func NewCrawler

func NewCrawler(config Config) (*Crawler, error)

NewCrawler creates a new Crawler. Call Run() to start.

func (*Crawler) Run

func (c *Crawler) Run(ctx context.Context) error

Run starts the crawl. It blocks until the crawl is complete or the context is cancelled.

func (*Crawler) Shutdown

func (c *Crawler) Shutdown()

Shutdown gracefully stops the crawl.

type DiscoveredRepo

type DiscoveredRepo struct {
	DID    string
	PDS    string
	Head   string
	Rev    string
	Active bool
	Status string
}

DiscoveredRepo represents a repo found on a PDS via listRepos.

type Discoverer

type Discoverer struct {
	// contains filtered or unexported fields
}

Discoverer finds PDSs and enumerates repos from a relay.

func NewDiscoverer

func NewDiscoverer(config DiscoveryConfig) *Discoverer

NewDiscoverer creates a new Discoverer.

func (*Discoverer) EnumerateRepos

func (d *Discoverer) EnumerateRepos(ctx context.Context, hosts []HostInfo, output chan<- []DiscoveredRepo) error

EnumerateRepos concurrently pages through listRepos on each PDS and sends batches of discovered repos to the output channel. The channel is closed when all PDSs have been enumerated.

func (*Discoverer) ListHosts

func (d *Discoverer) ListHosts(ctx context.Context) ([]HostInfo, error)

ListHosts pages through the relay's listHosts endpoint and returns all non-junk PDS hosts, sorted by account count descending.

type DiscoveryConfig

type DiscoveryConfig struct {
	RelayHost      string
	Workers        int
	DiscoveryRPS   float64
	RelayRPS       float64
	ListReposLimit int64
	SkipPDS        []string
	Logger         *slog.Logger
}

DiscoveryConfig controls relay-based PDS and repo discovery.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher runs continuously, receiving pages of work via Feed() and dispatching CrawlTasks to a work queue with per-PDS rate limiting. Workers report PDS health via ReportResult(); after enough consecutive failures a PDS is blocked and its remaining DIDs are skipped.

func NewDispatcher

func NewDispatcher(defaultRPS float64, logger *slog.Logger) *Dispatcher

NewDispatcher creates a new dispatcher.

func (*Dispatcher) ActivePDSCount

func (d *Dispatcher) ActivePDSCount() int32

ActivePDSCount returns the number of PDSs with remaining undispatched work.

func (*Dispatcher) Feed

func (d *Dispatcher) Feed(pdsGroups map[string][]string)

Feed sends a page of work to the dispatcher. It blocks if the dispatcher's internal buffer is full (backpressure). Safe to call from another goroutine.

func (*Dispatcher) Remaining

func (d *Dispatcher) Remaining() int64

Remaining returns the number of DIDs not yet dispatched.

func (*Dispatcher) ReportResult

func (d *Dispatcher) ReportResult(pds string, success bool)

ReportResult reports a PDS success or failure from a worker goroutine. Thread-safe — sends on a buffered channel processed by the dispatcher loop.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context, workQueue chan<- *CrawlTask) error

Run dispatches CrawlTasks to the workQueue continuously until Stop() is called or the context is cancelled. It receives pages via Feed().

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop signals the dispatcher to finish and return from Run().

type HostInfo

type HostInfo struct {
	Hostname     string
	AccountCount int64
	URL          string
}

HostInfo represents a PDS discovered from the relay.

type PDSState

type PDSState struct {
	URL     string
	Limiter *rate.Limiter
	DIDs    []string
	Cursor  int  // index of next DID to dispatch
	Errors  int  // consecutive errors (for adaptive backoff)
	Blocked bool // true when PDS is unreachable
}

PDSState tracks rate limiting and remaining work for a single PDS.

type PrepareConfig

type PrepareConfig struct {
	DiscoveryConfig
	ClickHouseConn driver.Conn
	VerifyPDS      bool
	BatchSize      int
	Logger         *slog.Logger
}

PrepareConfig holds configuration for the prepare step.

type Progress

type Progress struct {
	// contains filtered or unexported fields
}

Progress tracks crawl progress in memory using lock-free primitives.

Concurrency model:

  • completed: sync.Map — 50 workers write disjoint DIDs, main goroutine reads during FilterPage
  • failedCounts: pre-initialized atomic counters — workers increment, no reads on hot path
  • cursor: only touched by the main goroutine — no synchronization needed
  • segmentNum: atomic.Int32 — writer goroutine writes, main reads at startup

func NewProgress

func NewProgress(logger *slog.Logger) *Progress

NewProgress creates a new in-memory progress tracker.

func (*Progress) ClearCursor

func (p *Progress) ClearCursor()

ClearCursor resets the pagination cursor.

func (*Progress) FilterPage

func (p *Progress) FilterPage(pdsGroups map[string][]string) int

FilterPage removes already-completed DIDs from the pdsGroups map in place. Returns the count of DIDs filtered out.

func (*Progress) GetCursor

func (p *Progress) GetCursor() string

GetCursor returns the pagination cursor. Returns "" if not set.

func (*Progress) GetSegmentNum

func (p *Progress) GetSegmentNum() int

GetSegmentNum returns the current segment number.

func (*Progress) MarkCompleted

func (p *Progress) MarkCompleted(did string)

MarkCompleted records a DID as completed.

func (*Progress) MarkFailed

func (p *Progress) MarkFailed(did string, category string)

MarkFailed increments the failure counter for the given category. Permanent failures are also added to the completed set so FilterPage skips them.

func (*Progress) SetCursor

func (p *Progress) SetCursor(cursor string)

SetCursor stores the pagination cursor.

func (*Progress) SetSegmentNum

func (p *Progress) SetSegmentNum(n int)

SetSegmentNum stores the current segment number.

type RepoError

type RepoError struct {
	Code     int
	Category string // "not_found", "deactivated", "takendown", "rate_limited", "http_error", "dns_error", "unavailable", "parse_error", "timeout"
	DID      string
	PDS      string
	Err      error
}

RepoError represents a categorized error during repo processing.

func (*RepoError) Error

func (e *RepoError) Error() string

func (*RepoError) Unwrap

func (e *RepoError) Unwrap() error

type VerifiedRepo

type VerifiedRepo struct {
	DiscoveredRepo
	Verified bool
}

VerifiedRepo is a DiscoveredRepo with its verification status set.

func VerifyInMemory

func VerifyInMemory(repos []DiscoveredRepo, plcState map[string]string) []VerifiedRepo

VerifyInMemory verifies a batch of repos against a preloaded PLC state map. did:plc repos are checked via map lookup; did:web repos are included as unverified.

type Verifier

type Verifier struct {
	// contains filtered or unexported fields
}

Verifier checks PDS claims against DID documents.

func NewVerifier

func NewVerifier(chConn driver.Conn, logger *slog.Logger) *Verifier

NewVerifier creates a new Verifier.

func (*Verifier) VerifyBatch

func (v *Verifier) VerifyBatch(ctx context.Context, repos []DiscoveredRepo) []VerifiedRepo

VerifyBatch verifies a batch of discovered repos by checking their PDS claims. For did:plc DIDs, it checks against plc_did_state in ClickHouse. For did:web DIDs, it resolves the DID document over HTTP. Returns only repos that pass verification (or are unverified but kept).

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of crawl workers.

func NewWorkerPool

func NewWorkerPool(
	numWorkers int,
	repoQueue chan *repoarchive.SerializedRepo,
	crawledQueue chan *repoarchive.CrawledRepo,
	progress *Progress,
	dispatcher *Dispatcher,
	config Config,
	logger *slog.Logger,
) *WorkerPool

NewWorkerPool creates and starts a pool of crawl workers.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop signals workers to drain and waits for them to finish.

func (*WorkerPool) WorkQueue

func (wp *WorkerPool) WorkQueue() chan<- *CrawlTask

WorkQueue returns the channel to send CrawlTasks to.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL