Documentation
¶
Index ¶
- func FinalizeTableAfterBulkLoad(ctx context.Context, conn driver.Conn, logger *slog.Logger) error
- func PreloadPLCState(ctx context.Context, conn driver.Conn, logger *slog.Logger) (map[string]string, error)
- func PrepareTableForBulkLoad(ctx context.Context, conn driver.Conn, truncate bool, logger *slog.Logger) error
- func RunPrepare(ctx context.Context, config PrepareConfig) error
- func SetupBulkClickHouse(address, username, password string) (driver.Conn, error)
- type CHWriter
- type Config
- type CrawlTask
- type Crawler
- type DiscoveredRepo
- type Discoverer
- type DiscoveryConfig
- type Dispatcher
- func (d *Dispatcher) ActivePDSCount() int32
- func (d *Dispatcher) Feed(pdsGroups map[string][]string)
- func (d *Dispatcher) Remaining() int64
- func (d *Dispatcher) ReportResult(pds string, success bool)
- func (d *Dispatcher) Run(ctx context.Context, workQueue chan<- *CrawlTask) error
- func (d *Dispatcher) Stop()
- type HostInfo
- type PDSState
- type PrepareConfig
- type Progress
- func (p *Progress) ClearCursor()
- func (p *Progress) FilterPage(pdsGroups map[string][]string) int
- func (p *Progress) GetCursor() string
- func (p *Progress) GetSegmentNum() int
- func (p *Progress) MarkCompleted(did string)
- func (p *Progress) MarkFailed(did string, category string)
- func (p *Progress) SetCursor(cursor string)
- func (p *Progress) SetSegmentNum(n int)
- type RepoError
- type VerifiedRepo
- type Verifier
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FinalizeTableAfterBulkLoad ¶
FinalizeTableAfterBulkLoad re-adds secondary indexes and resumes merges.
func PreloadPLCState ¶
func PreloadPLCState(ctx context.Context, conn driver.Conn, logger *slog.Logger) (map[string]string, error)
PreloadPLCState loads the full DID→PDS mapping from plc_did_state into memory. Uses argMax instead of FINAL for better performance on large ReplacingMergeTree tables. PDS URL strings are interned to reduce memory since most accounts share a few PDS hosts.
func PrepareTableForBulkLoad ¶
func PrepareTableForBulkLoad(ctx context.Context, conn driver.Conn, truncate bool, logger *slog.Logger) error
PrepareTableForBulkLoad stops merges and drops secondary indexes on crawl_records to maximize insert throughput during bulk loading.
func RunPrepare ¶
func RunPrepare(ctx context.Context, config PrepareConfig) error
RunPrepare discovers PDSs from the relay, enumerates repos, verifies PDS claims, and writes the results to the crawl_repos table in ClickHouse.
Types ¶
type CHWriter ¶
type CHWriter struct {
// contains filtered or unexported fields
}
CHWriter receives CrawledRepos and batch-inserts them into ClickHouse.
func NewCHWriter ¶
func NewCHWriter(conn driver.Conn, input <-chan *repoarchive.CrawledRepo, logger *slog.Logger) *CHWriter
NewCHWriter creates a CHWriter that reads from the given channel.
func (*CHWriter) Run ¶
Run starts the writer loop. It blocks until the input channel is closed or ctx is cancelled.
type Config ¶
type Config struct {
OutputDir string
Workers int
DefaultPDSRPS float64
SegmentSize int64
ZstdLevel int
MaxRetries int
MaxRepoSizeMB int
PageSize int
SkipPDS []string
// Direct mode: write directly to ClickHouse instead of .rca archives.
Direct bool
DirectCHConn driver.Conn // Bulk-optimized connection for direct inserts.
ClickHouseConn driver.Conn
Logger *slog.Logger
}
Config holds all configuration for a crawl run.
type Crawler ¶
type Crawler struct {
// contains filtered or unexported fields
}
Crawler orchestrates the full network crawl.
func NewCrawler ¶
NewCrawler creates a new Crawler. Call Run() to start.
type DiscoveredRepo ¶
type DiscoveredRepo struct {
DID string
PDS string
Head string
Rev string
Active bool
Status string
}
DiscoveredRepo represents a repo found on a PDS via listRepos.
type Discoverer ¶
type Discoverer struct {
// contains filtered or unexported fields
}
Discoverer finds PDSs and enumerates repos from a relay.
func NewDiscoverer ¶
func NewDiscoverer(config DiscoveryConfig) *Discoverer
NewDiscoverer creates a new Discoverer.
func (*Discoverer) EnumerateRepos ¶
func (d *Discoverer) EnumerateRepos(ctx context.Context, hosts []HostInfo, output chan<- []DiscoveredRepo) error
EnumerateRepos concurrently pages through listRepos on each PDS and sends batches of discovered repos to the output channel. The channel is closed when all PDSs have been enumerated.
type DiscoveryConfig ¶
type DiscoveryConfig struct {
RelayHost string
Workers int
DiscoveryRPS float64
RelayRPS float64
ListReposLimit int64
SkipPDS []string
Logger *slog.Logger
}
DiscoveryConfig controls relay-based PDS and repo discovery.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher runs continuously, receiving pages of work via Feed() and dispatching CrawlTasks to a work queue with per-PDS rate limiting. Workers report PDS health via ReportResult(); after enough consecutive failures a PDS is blocked and its remaining DIDs are skipped.
func NewDispatcher ¶
func NewDispatcher(defaultRPS float64, logger *slog.Logger) *Dispatcher
NewDispatcher creates a new dispatcher.
func (*Dispatcher) ActivePDSCount ¶
func (d *Dispatcher) ActivePDSCount() int32
ActivePDSCount returns the number of PDSs with remaining undispatched work.
func (*Dispatcher) Feed ¶
func (d *Dispatcher) Feed(pdsGroups map[string][]string)
Feed sends a page of work to the dispatcher. It blocks if the dispatcher's internal buffer is full (backpressure). Safe to call from another goroutine.
func (*Dispatcher) Remaining ¶
func (d *Dispatcher) Remaining() int64
Remaining returns the number of DIDs not yet dispatched.
func (*Dispatcher) ReportResult ¶
func (d *Dispatcher) ReportResult(pds string, success bool)
ReportResult reports a PDS success or failure from a worker goroutine. Thread-safe — sends on a buffered channel processed by the dispatcher loop.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context, workQueue chan<- *CrawlTask) error
Run dispatches CrawlTasks to the workQueue continuously until Stop() is called or the context is cancelled. It receives pages via Feed().
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop signals the dispatcher to finish and return from Run().
type PDSState ¶
type PDSState struct {
URL string
Limiter *rate.Limiter
DIDs []string
Cursor int // index of next DID to dispatch
Errors int // consecutive errors (for adaptive backoff)
Blocked bool // true when PDS is unreachable
}
PDSState tracks rate limiting and remaining work for a single PDS.
type PrepareConfig ¶
type PrepareConfig struct {
DiscoveryConfig
ClickHouseConn driver.Conn
VerifyPDS bool
BatchSize int
Logger *slog.Logger
}
PrepareConfig holds configuration for the prepare step.
type Progress ¶
type Progress struct {
// contains filtered or unexported fields
}
Progress tracks crawl progress in memory using lock-free primitives.
Concurrency model:
- completed: sync.Map — 50 workers write disjoint DIDs, main goroutine reads during FilterPage
- failedCounts: pre-initialized atomic counters — workers increment, no reads on hot path
- cursor: only touched by the main goroutine — no synchronization needed
- segmentNum: atomic.Int32 — writer goroutine writes, main reads at startup
func NewProgress ¶
NewProgress creates a new in-memory progress tracker.
func (*Progress) ClearCursor ¶
func (p *Progress) ClearCursor()
ClearCursor resets the pagination cursor.
func (*Progress) FilterPage ¶
FilterPage removes already-completed DIDs from the pdsGroups map in place. Returns the count of DIDs filtered out.
func (*Progress) GetSegmentNum ¶
GetSegmentNum returns the current segment number.
func (*Progress) MarkCompleted ¶
MarkCompleted records a DID as completed.
func (*Progress) MarkFailed ¶
MarkFailed increments the failure counter for the given category. Permanent failures are also added to the completed set so FilterPage skips them.
func (*Progress) SetSegmentNum ¶
SetSegmentNum stores the current segment number.
type RepoError ¶
type RepoError struct {
Code int
Category string // "not_found", "deactivated", "takendown", "rate_limited", "http_error", "dns_error", "unavailable", "parse_error", "timeout"
DID string
PDS string
Err error
}
RepoError represents a categorized error during repo processing.
type VerifiedRepo ¶
type VerifiedRepo struct {
DiscoveredRepo
Verified bool
}
VerifiedRepo is a DiscoveredRepo with its verification status set.
func VerifyInMemory ¶
func VerifyInMemory(repos []DiscoveredRepo, plcState map[string]string) []VerifiedRepo
VerifyInMemory verifies a batch of repos against a preloaded PLC state map. did:plc repos are checked via map lookup; did:web repos are included as unverified.
type Verifier ¶
type Verifier struct {
// contains filtered or unexported fields
}
Verifier checks PDS claims against DID documents.
func NewVerifier ¶
NewVerifier creates a new Verifier.
func (*Verifier) VerifyBatch ¶
func (v *Verifier) VerifyBatch(ctx context.Context, repos []DiscoveredRepo) []VerifiedRepo
VerifyBatch verifies a batch of discovered repos by checking their PDS claims. For did:plc DIDs, it checks against plc_did_state in ClickHouse. For did:web DIDs, it resolves the DID document over HTTP. Returns only repos that pass verification (or are unverified but kept).
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of crawl workers.
func NewWorkerPool ¶
func NewWorkerPool( numWorkers int, repoQueue chan *repoarchive.SerializedRepo, crawledQueue chan *repoarchive.CrawledRepo, progress *Progress, dispatcher *Dispatcher, config Config, logger *slog.Logger, ) *WorkerPool
NewWorkerPool creates and starts a pool of crawl workers.
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
Stop signals workers to drain and waits for them to finish.
func (*WorkerPool) WorkQueue ¶
func (wp *WorkerPool) WorkQueue() chan<- *CrawlTask
WorkQueue returns the channel to send CrawlTasks to.