Documentation
¶
Index ¶
- Constants
- Variables
- func ConstructTaskURL(path, host, domainName string) string
- func IsRateLimitError(err error) bool
- func ProcessDiscoveredLinks(ctx context.Context, deps LinkDiscoveryDeps, task *Task, ...)
- type CrawlerInterface
- type DbQueueInterface
- type DbQueueProvider
- type ExecutorConfig
- type HTMLPersister
- type HTMLPersisterConfig
- type HTMLPersisterDeps
- type Job
- type JobInfo
- type JobManager
- func (jm *JobManager) BlockJob(ctx context.Context, jobID string, vendor, reason string) error
- func (jm *JobManager) CalculateJobProgress(job *Job) float64
- func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error
- func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
- func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, ...) error
- func (jm *JobManager) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (jm *JobManager) GetJobStatus(ctx context.Context, jobID string) (*Job, error)
- func (jm *JobManager) GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error)
- func (jm *JobManager) IsJobComplete(job *Job) bool
- func (jm *JobManager) MarkJobRunning(ctx context.Context, jobID string) error
- func (jm *JobManager) MaybeFireMilestones(ctx context.Context, jobIDs []string)
- func (jm *JobManager) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
- func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error
- type JobManagerInterface
- type JobOptions
- type JobStatus
- type JobTerminatedCallback
- type LinkDiscoveryDeps
- type ProgressMilestoneCallback
- type QuotaExceededError
- type RetryDecision
- type StreamWorkerDeps
- type StreamWorkerOpts
- type StreamWorkerPool
- func (swp *StreamWorkerPool) ActiveJobIDs(ctx context.Context) ([]string, error)
- func (swp *StreamWorkerPool) CanDispatch(ctx context.Context, jobID string) (bool, error)
- func (swp *StreamWorkerPool) SetHeartbeat(h interface{ ... })
- func (swp *StreamWorkerPool) Start(ctx context.Context)
- func (swp *StreamWorkerPool) Stop()
- func (swp *StreamWorkerPool) TriggerReconcile(ctx context.Context)
- type Task
- type TaskExecutor
- type TaskHTMLUpload
- type TaskOutcome
- type TaskScheduleCallback
- type TaskScheduleEntry
- type TaskStatus
- type TaskType
- type WAFCircuitBreaker
- func (b *WAFCircuitBreaker) Forget(jobID string)
- func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobManagerInterface, outcome *TaskOutcome)
- func (b *WAFCircuitBreaker) Observe(jobID string, det *crawler.WAFDetection) (tripped bool, vendor crawler.WAFDetection)
- func (b *WAFCircuitBreaker) Rearm(jobID string, lastVendor crawler.WAFDetection)
- func (b *WAFCircuitBreaker) Threshold() int
Constants ¶
const ( TaskStaleTimeout = 3 * time.Minute MaxTaskRetries = 5 )
Variables ¶
var (
ErrDomainDelay = errors.New("domain rate limit delay")
)
Functions ¶
func ConstructTaskURL ¶
func IsRateLimitError ¶
IsRateLimitError matches a broader set than isBlockingError: pacer state updates fire even when the executor would not retry.
func ProcessDiscoveredLinks ¶
func ProcessDiscoveredLinks(ctx context.Context, deps LinkDiscoveryDeps, task *Task, links map[string][]string, sourceURL string, robotsRules *crawler.RobotsRules)
Priority promotion is handled by EnqueueURLs via `priority_score = GREATEST(tasks.priority_score, EXCLUDED.priority_score)`, and CreatePageRecords' no-op DO UPDATE forces RETURNING to emit pre-existing rows too — so no caller-level updateTaskPriorities step is required.
Types ¶
type CrawlerInterface ¶
type CrawlerInterface interface {
WarmURL(ctx context.Context, url string, findLinks bool) (*crawler.CrawlResult, error)
DiscoverSitemapsAndRobots(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error)
ParseSitemap(ctx context.Context, sitemapURL string) ([]string, error)
FilterURLs(urls []string, includePaths, excludePaths []string) []string
GetUserAgent() string
// Probe issues a pre-flight WAF fingerprint request against the
// homepage of a domain. Issue #365 row 1.
Probe(ctx context.Context, domain string) (crawler.WAFDetection, error)
}
CrawlerInterface defines the methods we need from the crawler
type DbQueueInterface ¶
type DbQueueInterface interface {
UpdateTaskStatus(ctx context.Context, task *db.Task) error
Execute(ctx context.Context, fn func(*sql.Tx) error) error
ExecuteControl(ctx context.Context, fn func(*sql.Tx) error) error
ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
ExecuteControlWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
ExecuteMaintenance(ctx context.Context, fn func(*sql.Tx) error) error
SetConcurrencyOverride(fn db.ConcurrencyOverrideFunc)
UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error
UpdateTaskHTMLMetadata(ctx context.Context, taskID string, metadata db.TaskHTMLMetadata) error
BatchUpsertTaskHTMLMetadata(ctx context.Context, rows []db.TaskHTMLMetadataRow) error
PromoteWaitingToPending(ctx context.Context, jobID string, limit int) (int, error)
}
DbQueueInterface defines the database queue operations needed by the job system.
type DbQueueProvider ¶
type ExecutorConfig ¶
type ExecutorConfig struct {
MaxBlockingRetries int
MaxTaskRetries int
BaseDelayMS int
MaxDelayMS int
}
func DefaultExecutorConfig ¶
func DefaultExecutorConfig() ExecutorConfig
type HTMLPersister ¶
type HTMLPersister struct {
// contains filtered or unexported fields
}
HTMLPersister streams completed-task HTML payloads to R2 and stamps the resulting metadata onto the task row. See issue #332 for context.
func NewHTMLPersister ¶
func NewHTMLPersister(cfg HTMLPersisterConfig, deps HTMLPersisterDeps) (*HTMLPersister, error)
func (*HTMLPersister) Enqueue ¶
func (p *HTMLPersister) Enqueue(ctx context.Context, task *db.Task, upload *TaskHTMLUpload) bool
Enqueue is non-blocking: a full queue drops the payload and emits a "skipped" metric. Returns true when the payload was accepted.
func (*HTMLPersister) QueueDepth ¶
func (p *HTMLPersister) QueueDepth() int
func (*HTMLPersister) Start ¶
func (p *HTMLPersister) Start(ctx context.Context)
func (*HTMLPersister) Stop ¶
func (p *HTMLPersister) Stop()
Stop drains already-accepted uploads before exiting. Hand-off ordering: flip stopped → close queue → wait for uploads → close stopCh → cancel ctx. If the parent ctx is cancelled externally first, in-flight uploads abort via uploadCtx and the remaining queue is dropped.
type HTMLPersisterConfig ¶
type HTMLPersisterConfig struct {
Workers int
// When full, new enqueues drop the payload — HTML capture is best-effort
// and must not block the stream worker loop.
QueueSize int
BatchSize int
FlushInterval time.Duration
// Without a per-call cap a hung R2 connection would occupy a worker
// indefinitely and starve the queue.
UploadTimeout time.Duration
Bucket string
Provider string
}
func DefaultHTMLPersisterConfig ¶
func DefaultHTMLPersisterConfig() HTMLPersisterConfig
Defaults tuned for ~4k tasks/min: 256-deep queue absorbs transient R2 hiccups; 8 workers matches the historical pool that ran cleanly under load.
type HTMLPersisterDeps ¶
type HTMLPersisterDeps struct {
Provider archive.ColdStorageProvider
DBQueue DbQueueInterface
}
type Job ¶
type Job struct {
ID string `json:"id"`
Domain string `json:"domain"`
UserID *string `json:"user_id,omitempty"`
OrganisationID *string `json:"organisation_id,omitempty"`
Status JobStatus `json:"status"`
Progress float64 `json:"progress"`
TotalTasks int `json:"total_tasks"`
CompletedTasks int `json:"completed_tasks"`
FailedTasks int `json:"failed_tasks"`
SkippedTasks int `json:"skipped_tasks"`
FoundTasks int `json:"found_tasks"`
SitemapTasks int `json:"sitemap_tasks"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
Concurrency int `json:"concurrency"`
FindLinks bool `json:"find_links"`
MaxPages int `json:"max_pages"`
IncludePaths []string `json:"include_paths,omitempty"`
ExcludePaths []string `json:"exclude_paths,omitempty"`
RequiredWorkers int `json:"required_workers"`
AllowCrossSubdomainLinks bool `json:"allow_cross_subdomain_links"`
SourceType *string `json:"source_type,omitempty"`
SourceDetail *string `json:"source_detail,omitempty"`
SourceInfo *string `json:"source_info,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
SchedulerID *string `json:"scheduler_id,omitempty"`
DurationSeconds *int `json:"duration_seconds,omitempty"`
AvgTimePerTaskSeconds *float64 `json:"avg_time_per_task_seconds,omitempty"`
}
CHECK: Do all of these currently get utilised somewhere in the app?
type JobManager ¶
type JobManager struct {
// Fire-and-forget; nil is allowed (legacy DB-queue mode, tests).
OnTasksEnqueued TaskScheduleCallback
// Fire-and-forget; must return promptly — runs on the batch flusher's goroutine.
OnProgressMilestone ProgressMilestoneCallback
// Fire-and-forget; nil allowed for tests and deploys without REDIS_URL.
OnJobTerminated JobTerminatedCallback
// contains filtered or unexported fields
}
func NewJobManager ¶
func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface) *JobManager
func (*JobManager) BlockJob ¶
BlockJob transitions a job to JobStatusBlocked when the WAF detector (pre-flight probe or mid-job circuit breaker) recognises bot protection on the domain. The shape mirrors CancelJob: tasks UPDATE → jobs UPDATE → task_outbox DELETE, with the same ORDER BY id lock order to keep the AFTER STATEMENT counter trigger from deadlocking against worker batches. domains.waf_blocked is set in the same transaction so a follow-up CreateJob short-circuits without re-probing.
func (*JobManager) CalculateJobProgress ¶
func (jm *JobManager) CalculateJobProgress(job *Job) float64
func (*JobManager) CancelJob ¶
func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error
func (*JobManager) CreateJob ¶
func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
func (*JobManager) EnqueueJobURLs ¶
func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
Wraps dbQueue.EnqueueURLs with duplicate-page filtering against the in-process processedPages map.
func (*JobManager) GetJobStatus ¶
func (*JobManager) GetRobotsRules ¶
func (jm *JobManager) GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error)
Returns nil (no error) when the crawler is unavailable; callers treat nil rules as "no restriction".
func (*JobManager) IsJobComplete ¶
func (jm *JobManager) IsJobComplete(job *Job) bool
func (*JobManager) MarkJobRunning ¶
func (jm *JobManager) MarkJobRunning(ctx context.Context, jobID string) error
Idempotent. Guard accepts 'initializing' too: sitemap jobs spend a real window in that state before first dispatch, and a narrower match would strand them on the "Starting up" pill.
func (*JobManager) MaybeFireMilestones ¶
func (jm *JobManager) MaybeFireMilestones(ctx context.Context, jobIDs []string)
Wired as the BatchFlushCallback on db.BatchManager. Multi-replica safe: downstream dedupe handles concurrent fires from sibling replicas.
func (*JobManager) UpdateJobStatus ¶
func (*JobManager) ValidateStatusTransition ¶
func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error
Allowed-edge rationale and case → action mapping: [CRAWL_HANDLING.md].
type JobManagerInterface ¶
type JobManagerInterface interface {
CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
CancelJob(ctx context.Context, jobID string) error
BlockJob(ctx context.Context, jobID string, vendor, reason string) error
GetJobStatus(ctx context.Context, jobID string) (*Job, error)
GetJob(ctx context.Context, jobID string) (*Job, error)
EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
IsJobComplete(job *Job) bool
CalculateJobProgress(job *Job) float64
ValidateStatusTransition(from, to JobStatus) error
UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
MarkJobRunning(ctx context.Context, jobID string) error
// Returns nil rules (not error) when crawler is unavailable; callers treat that as "no restriction".
GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error)
}
type JobOptions ¶
type JobOptions struct {
Domain string `json:"domain"`
UserID *string `json:"user_id,omitempty"`
OrganisationID *string `json:"organisation_id,omitempty"`
UseSitemap bool `json:"use_sitemap"`
Concurrency int `json:"concurrency"`
FindLinks bool `json:"find_links"`
AllowCrossSubdomainLinks bool `json:"allow_cross_subdomain_links"`
MaxPages int `json:"max_pages"`
IncludePaths []string `json:"include_paths,omitempty"`
ExcludePaths []string `json:"exclude_paths,omitempty"`
RequiredWorkers int `json:"required_workers"`
SourceType *string `json:"source_type,omitempty"`
SourceDetail *string `json:"source_detail,omitempty"`
SourceInfo *string `json:"source_info,omitempty"`
SchedulerID *string `json:"scheduler_id,omitempty"`
}
type JobStatus ¶
type JobStatus string
New value: also update ValidateStatusTransition + (if terminal) terminalJobStatuses + trigger preserve list + [CRAWL_HANDLING.md] row.
const ( JobStatusPending JobStatus = "pending" JobStatusInitialising JobStatus = "initializing" JobStatusRunning JobStatus = "running" JobStatusPaused JobStatus = "paused" JobStatusCompleted JobStatus = "completed" JobStatusFailed JobStatus = "failed" JobStatusCancelled JobStatus = "cancelled" JobStatusArchived JobStatus = "archived" // JobStatusBlocked is set when the WAF detector flags a domain as // blocked (issue #365 row 1). The pre-flight probe and the mid-job // circuit breaker both transition jobs into this terminal state. JobStatusBlocked JobStatus = "blocked" )
type JobTerminatedCallback ¶
type LinkDiscoveryDeps ¶
type LinkDiscoveryDeps struct {
DBQueue DbQueueInterface
JobManager JobManagerInterface
MinPriority float64
}
type QuotaExceededError ¶
type QuotaExceededError struct {
Used int `json:"used"`
Limit int `json:"limit"`
ResetsAt time.Time `json:"resets_at"`
PlanName string `json:"plan_name"`
}
func (*QuotaExceededError) Error ¶
func (e *QuotaExceededError) Error() string
type RetryDecision ¶
type StreamWorkerDeps ¶
type StreamWorkerDeps struct {
Consumer *broker.Consumer
Scheduler *broker.Scheduler
Counters *broker.RunningCounters
Pacer *broker.DomainPacer
Executor *TaskExecutor
BatchManager *db.BatchManager
DBQueue DbQueueInterface
JobManager JobManagerInterface
// HTMLPersister is nil when ARCHIVE_PROVIDER is unset (local dev
// without R2); completed tasks then persist without HTML.
HTMLPersister *HTMLPersister
// WAFBreaker is nil-safe: when supplied, every task outcome runs
// through it and a domain that produces N consecutive WAF
// fingerprints trips a job-level block (issue #365 row 1).
WAFBreaker *WAFCircuitBreaker
}
type StreamWorkerOpts ¶
type StreamWorkerOpts struct {
NumWorkers int
// TasksPerWorker caps in-flight tasks per consumer goroutine.
// Global ceiling = NumWorkers × TasksPerWorker.
TasksPerWorker int
ReclaimInterval time.Duration
}
func DefaultStreamWorkerOpts ¶
func DefaultStreamWorkerOpts() StreamWorkerOpts
type StreamWorkerPool ¶
type StreamWorkerPool struct {
// contains filtered or unexported fields
}
StreamWorkerPool consumes from Redis Streams, runs tasks via TaskExecutor, and acts on the TaskOutcome.
func NewStreamWorkerPool ¶
func NewStreamWorkerPool(deps StreamWorkerDeps, opts StreamWorkerOpts) *StreamWorkerPool
func (*StreamWorkerPool) ActiveJobIDs ¶
func (swp *StreamWorkerPool) ActiveJobIDs(ctx context.Context) ([]string, error)
func (*StreamWorkerPool) CanDispatch ¶
CanDispatch implements broker.ConcurrencyChecker.
func (*StreamWorkerPool) SetHeartbeat ¶
func (swp *StreamWorkerPool) SetHeartbeat(h interface{ Tick() })
func (*StreamWorkerPool) Start ¶
func (swp *StreamWorkerPool) Start(ctx context.Context)
Start launches consumer goroutines and the reclaim loop. Blocks until Stop is called or ctx is cancelled.
func (*StreamWorkerPool) Stop ¶
func (swp *StreamWorkerPool) Stop()
func (*StreamWorkerPool) TriggerReconcile ¶
func (swp *StreamWorkerPool) TriggerReconcile(ctx context.Context)
TriggerReconcile runs an immediate reconcile, collapsing concurrent calls onto an in-flight pass via TryLock. Contract: at least one reconcile after the most recent trigger.
type Task ¶
type Task struct {
ID string `json:"id"`
JobID string `json:"job_id"`
PageID int `json:"page_id"`
Host string `json:"host"`
Path string `json:"path"`
DomainID int `json:"domain_id"`
DomainName string `json:"domain_name"`
Status TaskStatus `json:"status"`
TaskType TaskType `json:"task_type,omitempty"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
RetryCount int `json:"retry_count"`
Error string `json:"error,omitempty"`
SourceType string `json:"source_type"` // "sitemap", "link", "manual"
SourceURL string `json:"source_url,omitempty"`
StatusCode int `json:"status_code,omitempty"`
ResponseTime int64 `json:"response_time,omitempty"`
CacheStatus string `json:"cache_status,omitempty"`
ContentType string `json:"content_type,omitempty"`
SecondResponseTime int64 `json:"second_response_time,omitempty"`
SecondCacheStatus string `json:"second_cache_status,omitempty"`
PriorityScore float64 `json:"priority_score"`
FindLinks bool `json:"-"`
CrawlDelay int `json:"-"` // seconds, from robots.txt
JobConcurrency int `json:"-"`
AdaptiveDelay int `json:"-"`
AdaptiveDelayFloor int `json:"-"`
AllowCrossSubdomainLinks bool `json:"-"`
}
type TaskExecutor ¶
type TaskExecutor struct {
// contains filtered or unexported fields
}
TaskExecutor runs crawl tasks and returns outcomes; it has no side effects on counters, schedulers, or persistence — the caller must act on the outcome.
func NewTaskExecutor ¶
func NewTaskExecutor(c CrawlerInterface, cfg ExecutorConfig) *TaskExecutor
func (*TaskExecutor) Execute ¶
func (e *TaskExecutor) Execute(ctx context.Context, task *Task) *TaskOutcome
Execute does not modify external state; all decisions are returned in TaskOutcome.
type TaskHTMLUpload ¶
type TaskOutcome ¶
type TaskOutcome struct {
Task *db.Task
CrawlResult *crawler.CrawlResult
Retry *RetryDecision
DiscoveredLinks map[string][]string
HTMLUpload *TaskHTMLUpload
RateLimited bool
Success bool
}
type TaskScheduleCallback ¶
type TaskScheduleCallback func(ctx context.Context, jobID string, entries []TaskScheduleEntry)
Nil callback means legacy DB-queue mode (no external broker scheduling).
type TaskScheduleEntry ¶
type TaskScheduleEntry struct {
TaskID string
PageID int
Host string
Path string
Status string // "pending", "waiting", "skipped"
Priority float64
RetryCount int
SourceType string
SourceURL string
RunAt time.Time
}
RunAt is the earliest dispatch time; for waiting/retry rows it carries the backoff deadline so callbacks don't schedule them as ready-now.
type TaskStatus ¶
type TaskStatus string
const ( TaskStatusWaiting TaskStatus = "waiting" TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusSkipped TaskStatus = "skipped" )
type TaskType ¶
type TaskType string
Mirrors tasks.task_type / task_outbox.task_type (migration 20260427000001).
type WAFCircuitBreaker ¶
type WAFCircuitBreaker struct {
// contains filtered or unexported fields
}
WAFCircuitBreaker tracks per-job runs of consecutive WAF-flagged responses and trips a callback once the threshold is reached. The state lives in process memory — fine for the current single-pod worker deployment (fly.worker.toml). If the worker ever scales out horizontally the counter would undercount across replicas; in that case migrate it to Redis (INCR job:<id>:waf_consecutive with TTL) alongside the existing running-counters HASH.
func NewWAFCircuitBreaker ¶
func NewWAFCircuitBreaker() *WAFCircuitBreaker
func (*WAFCircuitBreaker) Forget ¶
func (b *WAFCircuitBreaker) Forget(jobID string)
Forget drops all per-job state. Called from OnJobTerminated so a long-running worker doesn't accumulate per-job map entries.
func (*WAFCircuitBreaker) MaybeTripFromOutcome ¶
func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobManagerInterface, outcome *TaskOutcome)
MaybeTripFromOutcome is the convenience wrapper used from the stream worker hot path. It pulls the WAF detection from the outcome, calls Observe, and on a trip dispatches BlockJob in a detached goroutine with a bounded timeout. If the dispatch fails the breaker is re-armed for that job so a subsequent WAF response can retry.
The dispatch is asynchronous so the stream worker isn't held up by terminal-state DB lock contention — the per-task ACK / counter decrement / batch enqueue must not stall behind a multi-statement terminal transaction.
func (*WAFCircuitBreaker) Observe ¶
func (b *WAFCircuitBreaker) Observe(jobID string, det *crawler.WAFDetection) (tripped bool, vendor crawler.WAFDetection)
Observe records the WAF status of a single task outcome. When det is nil or det.Blocked is false the per-job counter resets, ensuring the breaker only fires on truly consecutive blocks rather than a sparse sprinkle. Returns true exactly once per job — the moment the threshold is first crossed — so callers can fire BlockJob without guarding against duplicates.
func (*WAFCircuitBreaker) Rearm ¶
func (b *WAFCircuitBreaker) Rearm(jobID string, lastVendor crawler.WAFDetection)
Rearm clears the single-fire tripped flag for a job AND seeds the consecutive-WAF counter to threshold-1 (with the previous trip's vendor preserved) so a single subsequent blocked outcome immediately retrips. Called when the dispatched BlockJob couldn't land — at that point we've already proven the domain is consistently walling us; making the retry re-establish the full streak would waste N-1 blocked observations. The counter still resets on any non-blocked response (Observe), so a site that recovers between attempts still gets a clean slate.
func (*WAFCircuitBreaker) Threshold ¶
func (b *WAFCircuitBreaker) Threshold() int
Threshold exposes the configured trip count for telemetry/logging.