jobs

package
v0.34.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskStaleTimeout = 3 * time.Minute
	MaxTaskRetries   = 5
)

Variables

View Source
var (
	ErrDomainDelay = errors.New("domain rate limit delay")
)

Functions

func ConstructTaskURL

func ConstructTaskURL(path, host, domainName string) string

func IsRateLimitError

func IsRateLimitError(err error) bool

IsRateLimitError matches a broader set than isBlockingError: pacer state updates fire even when the executor would not retry.

func ProcessDiscoveredLinks(ctx context.Context, deps LinkDiscoveryDeps, task *Task, links map[string][]string, sourceURL string, robotsRules *crawler.RobotsRules)

Priority promotion is handled by EnqueueURLs via `priority_score = GREATEST(tasks.priority_score, EXCLUDED.priority_score)`, and CreatePageRecords' no-op DO UPDATE forces RETURNING to emit pre-existing rows too — so no caller-level updateTaskPriorities step is required.

Types

type CrawlerInterface

type CrawlerInterface interface {
	WarmURL(ctx context.Context, url string, findLinks bool) (*crawler.CrawlResult, error)
	DiscoverSitemapsAndRobots(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error)
	ParseSitemap(ctx context.Context, sitemapURL string) ([]string, error)
	FilterURLs(urls []string, includePaths, excludePaths []string) []string
	GetUserAgent() string
	// Probe issues a pre-flight WAF fingerprint request against the
	// homepage of a domain. Issue #365 row 1.
	Probe(ctx context.Context, domain string) (crawler.WAFDetection, error)
}

CrawlerInterface defines the methods we need from the crawler

type DbQueueInterface

type DbQueueInterface interface {
	UpdateTaskStatus(ctx context.Context, task *db.Task) error
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
	ExecuteControl(ctx context.Context, fn func(*sql.Tx) error) error
	ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
	ExecuteControlWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
	ExecuteMaintenance(ctx context.Context, fn func(*sql.Tx) error) error
	SetConcurrencyOverride(fn db.ConcurrencyOverrideFunc)
	UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error
	UpdateTaskHTMLMetadata(ctx context.Context, taskID string, metadata db.TaskHTMLMetadata) error
	BatchUpsertTaskHTMLMetadata(ctx context.Context, rows []db.TaskHTMLMetadataRow) error
	PromoteWaitingToPending(ctx context.Context, jobID string, limit int) (int, error)
}

DbQueueInterface defines the database queue operations needed by the job system.

type DbQueueProvider

type DbQueueProvider interface {
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
	EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
	CleanupStuckJobs(ctx context.Context) error
}

type ExecutorConfig

type ExecutorConfig struct {
	MaxBlockingRetries int
	MaxTaskRetries     int
	BaseDelayMS        int
	MaxDelayMS         int
}

func DefaultExecutorConfig

func DefaultExecutorConfig() ExecutorConfig

type HTMLPersister

type HTMLPersister struct {
	// contains filtered or unexported fields
}

HTMLPersister streams completed-task HTML payloads to R2 and stamps the resulting metadata onto the task row. See issue #332 for context.

func NewHTMLPersister

func NewHTMLPersister(cfg HTMLPersisterConfig, deps HTMLPersisterDeps) (*HTMLPersister, error)

func (*HTMLPersister) Enqueue

func (p *HTMLPersister) Enqueue(ctx context.Context, task *db.Task, upload *TaskHTMLUpload) bool

Enqueue is non-blocking: a full queue drops the payload and emits a "skipped" metric. Returns true when the payload was accepted.

func (*HTMLPersister) QueueDepth

func (p *HTMLPersister) QueueDepth() int

func (*HTMLPersister) Start

func (p *HTMLPersister) Start(ctx context.Context)

func (*HTMLPersister) Stop

func (p *HTMLPersister) Stop()

Stop drains already-accepted uploads before exiting. Hand-off ordering: flip stopped → close queue → wait for uploads → close stopCh → cancel ctx. If the parent ctx is cancelled externally first, in-flight uploads abort via uploadCtx and the remaining queue is dropped.

type HTMLPersisterConfig

type HTMLPersisterConfig struct {
	Workers int
	// When full, new enqueues drop the payload — HTML capture is best-effort
	// and must not block the stream worker loop.
	QueueSize     int
	BatchSize     int
	FlushInterval time.Duration
	// Without a per-call cap a hung R2 connection would occupy a worker
	// indefinitely and starve the queue.
	UploadTimeout time.Duration
	Bucket        string
	Provider      string
}

func DefaultHTMLPersisterConfig

func DefaultHTMLPersisterConfig() HTMLPersisterConfig

Defaults tuned for ~4k tasks/min: 256-deep queue absorbs transient R2 hiccups; 8 workers matches the historical pool that ran cleanly under load.

type HTMLPersisterDeps

type HTMLPersisterDeps struct {
	Provider archive.ColdStorageProvider
	DBQueue  DbQueueInterface
}

type Job

type Job struct {
	ID                       string    `json:"id"`
	Domain                   string    `json:"domain"`
	UserID                   *string   `json:"user_id,omitempty"`
	OrganisationID           *string   `json:"organisation_id,omitempty"`
	Status                   JobStatus `json:"status"`
	Progress                 float64   `json:"progress"`
	TotalTasks               int       `json:"total_tasks"`
	CompletedTasks           int       `json:"completed_tasks"`
	FailedTasks              int       `json:"failed_tasks"`
	SkippedTasks             int       `json:"skipped_tasks"`
	FoundTasks               int       `json:"found_tasks"`
	SitemapTasks             int       `json:"sitemap_tasks"`
	CreatedAt                time.Time `json:"created_at"`
	StartedAt                time.Time `json:"started_at"`
	CompletedAt              time.Time `json:"completed_at"`
	Concurrency              int       `json:"concurrency"`
	FindLinks                bool      `json:"find_links"`
	MaxPages                 int       `json:"max_pages"`
	IncludePaths             []string  `json:"include_paths,omitempty"`
	ExcludePaths             []string  `json:"exclude_paths,omitempty"`
	RequiredWorkers          int       `json:"required_workers"`
	AllowCrossSubdomainLinks bool      `json:"allow_cross_subdomain_links"`
	SourceType               *string   `json:"source_type,omitempty"`
	SourceDetail             *string   `json:"source_detail,omitempty"`
	SourceInfo               *string   `json:"source_info,omitempty"`
	ErrorMessage             string    `json:"error_message,omitempty"`
	SchedulerID              *string   `json:"scheduler_id,omitempty"`
	DurationSeconds          *int      `json:"duration_seconds,omitempty"`
	AvgTimePerTaskSeconds    *float64  `json:"avg_time_per_task_seconds,omitempty"`
}

CHECK: Do all of these currently get utilised somewhere in the app?

type JobInfo

type JobInfo struct {
	DomainID                 int
	DomainName               string
	FindLinks                bool
	AllowCrossSubdomainLinks bool
	CrawlDelay               int
	Concurrency              int
	AdaptiveDelay            int
	AdaptiveDelayFloor       int
	RobotsRules              *crawler.RobotsRules
}

type JobManager

type JobManager struct {

	// Fire-and-forget; nil is allowed (legacy DB-queue mode, tests).
	OnTasksEnqueued TaskScheduleCallback

	// Fire-and-forget; must return promptly — runs on the batch flusher's goroutine.
	OnProgressMilestone ProgressMilestoneCallback

	// Fire-and-forget; nil allowed for tests and deploys without REDIS_URL.
	OnJobTerminated JobTerminatedCallback
	// contains filtered or unexported fields
}

func NewJobManager

func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface) *JobManager

func (*JobManager) BlockJob

func (jm *JobManager) BlockJob(ctx context.Context, jobID string, vendor, reason string) error

BlockJob transitions a job to JobStatusBlocked when the WAF detector (pre-flight probe or mid-job circuit breaker) recognises bot protection on the domain. The shape mirrors CancelJob: tasks UPDATE → jobs UPDATE → task_outbox DELETE, with the same ORDER BY id lock order to keep the AFTER STATEMENT counter trigger from deadlocking against worker batches. domains.waf_blocked is set in the same transaction so a follow-up CreateJob short-circuits without re-probing.

func (*JobManager) CalculateJobProgress

func (jm *JobManager) CalculateJobProgress(job *Job) float64

func (*JobManager) CancelJob

func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error

func (*JobManager) CreateJob

func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)

func (*JobManager) EnqueueJobURLs

func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error

Wraps dbQueue.EnqueueURLs with duplicate-page filtering against the in-process processedPages map.

func (*JobManager) GetJob

func (jm *JobManager) GetJob(ctx context.Context, jobID string) (*Job, error)

func (*JobManager) GetJobStatus

func (jm *JobManager) GetJobStatus(ctx context.Context, jobID string) (*Job, error)

func (*JobManager) GetRobotsRules

func (jm *JobManager) GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error)

Returns nil (no error) when the crawler is unavailable; callers treat nil rules as "no restriction".

func (*JobManager) IsJobComplete

func (jm *JobManager) IsJobComplete(job *Job) bool

func (*JobManager) MarkJobRunning

func (jm *JobManager) MarkJobRunning(ctx context.Context, jobID string) error

Idempotent. Guard accepts 'initializing' too: sitemap jobs spend a real window in that state before first dispatch, and a narrower match would strand them on the "Starting up" pill.

func (*JobManager) MaybeFireMilestones

func (jm *JobManager) MaybeFireMilestones(ctx context.Context, jobIDs []string)

Wired as the BatchFlushCallback on db.BatchManager. Multi-replica safe: downstream dedupe handles concurrent fires from sibling replicas.

func (*JobManager) UpdateJobStatus

func (jm *JobManager) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error

func (*JobManager) ValidateStatusTransition

func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error

Allowed-edge rationale and case → action mapping: [CRAWL_HANDLING.md].

type JobManagerInterface

type JobManagerInterface interface {
	CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
	CancelJob(ctx context.Context, jobID string) error
	BlockJob(ctx context.Context, jobID string, vendor, reason string) error
	GetJobStatus(ctx context.Context, jobID string) (*Job, error)

	GetJob(ctx context.Context, jobID string) (*Job, error)
	EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error

	IsJobComplete(job *Job) bool
	CalculateJobProgress(job *Job) float64
	ValidateStatusTransition(from, to JobStatus) error
	UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
	MarkJobRunning(ctx context.Context, jobID string) error

	// Returns nil rules (not error) when crawler is unavailable; callers treat that as "no restriction".
	GetRobotsRules(ctx context.Context, domain string) (*crawler.RobotsRules, error)
}

type JobOptions

type JobOptions struct {
	Domain                   string   `json:"domain"`
	UserID                   *string  `json:"user_id,omitempty"`
	OrganisationID           *string  `json:"organisation_id,omitempty"`
	UseSitemap               bool     `json:"use_sitemap"`
	Concurrency              int      `json:"concurrency"`
	FindLinks                bool     `json:"find_links"`
	AllowCrossSubdomainLinks bool     `json:"allow_cross_subdomain_links"`
	MaxPages                 int      `json:"max_pages"`
	IncludePaths             []string `json:"include_paths,omitempty"`
	ExcludePaths             []string `json:"exclude_paths,omitempty"`
	RequiredWorkers          int      `json:"required_workers"`
	SourceType               *string  `json:"source_type,omitempty"`
	SourceDetail             *string  `json:"source_detail,omitempty"`
	SourceInfo               *string  `json:"source_info,omitempty"`
	SchedulerID              *string  `json:"scheduler_id,omitempty"`
}

type JobStatus

type JobStatus string

New value: also update ValidateStatusTransition + (if terminal) terminalJobStatuses + trigger preserve list + [CRAWL_HANDLING.md] row.

const (
	JobStatusPending      JobStatus = "pending"
	JobStatusInitialising JobStatus = "initializing"
	JobStatusRunning      JobStatus = "running"
	JobStatusPaused       JobStatus = "paused"
	JobStatusCompleted    JobStatus = "completed"
	JobStatusFailed       JobStatus = "failed"
	JobStatusCancelled    JobStatus = "cancelled"
	JobStatusArchived     JobStatus = "archived"
	// JobStatusBlocked is set when the WAF detector flags a domain as
	// blocked (issue #365 row 1). The pre-flight probe and the mid-job
	// circuit breaker both transition jobs into this terminal state.
	JobStatusBlocked JobStatus = "blocked"
)

type JobTerminatedCallback

type JobTerminatedCallback func(ctx context.Context, jobID string)

type LinkDiscoveryDeps

type LinkDiscoveryDeps struct {
	DBQueue     DbQueueInterface
	JobManager  JobManagerInterface
	MinPriority float64
}

type ProgressMilestoneCallback

type ProgressMilestoneCallback func(ctx context.Context, jobID string, oldPct, newPct int)

type QuotaExceededError

type QuotaExceededError struct {
	Used     int       `json:"used"`
	Limit    int       `json:"limit"`
	ResetsAt time.Time `json:"resets_at"`
	PlanName string    `json:"plan_name"`
}

func (*QuotaExceededError) Error

func (e *QuotaExceededError) Error() string

type RetryDecision

type RetryDecision struct {
	ShouldRetry        bool
	NextRunAt          time.Time
	Reason             string // "blocking", "retryable", "domain_delay"
	IsPermanentFailure bool
}

type StreamWorkerDeps

type StreamWorkerDeps struct {
	Consumer     *broker.Consumer
	Scheduler    *broker.Scheduler
	Counters     *broker.RunningCounters
	Pacer        *broker.DomainPacer
	Executor     *TaskExecutor
	BatchManager *db.BatchManager
	DBQueue      DbQueueInterface
	JobManager   JobManagerInterface
	// HTMLPersister is nil when ARCHIVE_PROVIDER is unset (local dev
	// without R2); completed tasks then persist without HTML.
	HTMLPersister *HTMLPersister
	// WAFBreaker is nil-safe: when supplied, every task outcome runs
	// through it and a domain that produces N consecutive WAF
	// fingerprints trips a job-level block (issue #365 row 1).
	WAFBreaker *WAFCircuitBreaker
}

type StreamWorkerOpts

type StreamWorkerOpts struct {
	NumWorkers int
	// TasksPerWorker caps in-flight tasks per consumer goroutine.
	// Global ceiling = NumWorkers × TasksPerWorker.
	TasksPerWorker  int
	ReclaimInterval time.Duration
}

func DefaultStreamWorkerOpts

func DefaultStreamWorkerOpts() StreamWorkerOpts

type StreamWorkerPool

type StreamWorkerPool struct {
	// contains filtered or unexported fields
}

StreamWorkerPool consumes from Redis Streams, runs tasks via TaskExecutor, and acts on the TaskOutcome.

func NewStreamWorkerPool

func NewStreamWorkerPool(deps StreamWorkerDeps, opts StreamWorkerOpts) *StreamWorkerPool

func (*StreamWorkerPool) ActiveJobIDs

func (swp *StreamWorkerPool) ActiveJobIDs(ctx context.Context) ([]string, error)

func (*StreamWorkerPool) CanDispatch

func (swp *StreamWorkerPool) CanDispatch(ctx context.Context, jobID string) (bool, error)

CanDispatch implements broker.ConcurrencyChecker.

func (*StreamWorkerPool) SetHeartbeat

func (swp *StreamWorkerPool) SetHeartbeat(h interface{ Tick() })

func (*StreamWorkerPool) Start

func (swp *StreamWorkerPool) Start(ctx context.Context)

Start launches consumer goroutines and the reclaim loop. Blocks until Stop is called or ctx is cancelled.

func (*StreamWorkerPool) Stop

func (swp *StreamWorkerPool) Stop()

func (*StreamWorkerPool) TriggerReconcile

func (swp *StreamWorkerPool) TriggerReconcile(ctx context.Context)

TriggerReconcile runs an immediate reconcile, collapsing concurrent calls onto an in-flight pass via TryLock. Contract: at least one reconcile after the most recent trigger.

type Task

type Task struct {
	ID          string     `json:"id"`
	JobID       string     `json:"job_id"`
	PageID      int        `json:"page_id"`
	Host        string     `json:"host"`
	Path        string     `json:"path"`
	DomainID    int        `json:"domain_id"`
	DomainName  string     `json:"domain_name"`
	Status      TaskStatus `json:"status"`
	TaskType    TaskType   `json:"task_type,omitempty"`
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   time.Time  `json:"started_at"`
	CompletedAt time.Time  `json:"completed_at"`
	RetryCount  int        `json:"retry_count"`
	Error       string     `json:"error,omitempty"`

	SourceType string `json:"source_type"` // "sitemap", "link", "manual"
	SourceURL  string `json:"source_url,omitempty"`

	StatusCode         int    `json:"status_code,omitempty"`
	ResponseTime       int64  `json:"response_time,omitempty"`
	CacheStatus        string `json:"cache_status,omitempty"`
	ContentType        string `json:"content_type,omitempty"`
	SecondResponseTime int64  `json:"second_response_time,omitempty"`
	SecondCacheStatus  string `json:"second_cache_status,omitempty"`

	PriorityScore float64 `json:"priority_score"`

	FindLinks                bool `json:"-"`
	CrawlDelay               int  `json:"-"` // seconds, from robots.txt
	JobConcurrency           int  `json:"-"`
	AdaptiveDelay            int  `json:"-"`
	AdaptiveDelayFloor       int  `json:"-"`
	AllowCrossSubdomainLinks bool `json:"-"`
}

type TaskExecutor

type TaskExecutor struct {
	// contains filtered or unexported fields
}

TaskExecutor runs crawl tasks and returns outcomes; it has no side effects on counters, schedulers, or persistence — the caller must act on the outcome.

func NewTaskExecutor

func NewTaskExecutor(c CrawlerInterface, cfg ExecutorConfig) *TaskExecutor

func (*TaskExecutor) Execute

func (e *TaskExecutor) Execute(ctx context.Context, task *Task) *TaskOutcome

Execute does not modify external state; all decisions are returned in TaskOutcome.

type TaskHTMLUpload

type TaskHTMLUpload struct {
	Path                string
	ContentType         string
	UploadContentType   string
	ContentEncoding     string
	SizeBytes           int64
	CompressedSizeBytes int64
	SHA256              string
	CapturedAt          time.Time
	Payload             []byte
}

type TaskOutcome

type TaskOutcome struct {
	Task            *db.Task
	CrawlResult     *crawler.CrawlResult
	Retry           *RetryDecision
	DiscoveredLinks map[string][]string
	HTMLUpload      *TaskHTMLUpload
	RateLimited     bool
	Success         bool
}

type TaskScheduleCallback

type TaskScheduleCallback func(ctx context.Context, jobID string, entries []TaskScheduleEntry)

Nil callback means legacy DB-queue mode (no external broker scheduling).

type TaskScheduleEntry

type TaskScheduleEntry struct {
	TaskID     string
	PageID     int
	Host       string
	Path       string
	Status     string // "pending", "waiting", "skipped"
	Priority   float64
	RetryCount int
	SourceType string
	SourceURL  string
	RunAt      time.Time
}

RunAt is the earliest dispatch time; for waiting/retry rows it carries the backoff deadline so callbacks don't schedule them as ready-now.

type TaskStatus

type TaskStatus string
const (
	TaskStatusWaiting   TaskStatus = "waiting"
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusSkipped   TaskStatus = "skipped"
)

type TaskType

type TaskType string

Mirrors tasks.task_type / task_outbox.task_type (migration 20260427000001).

const (
	TaskTypeCrawl      TaskType = "crawl"
	TaskTypeLighthouse TaskType = "lighthouse"
)

type WAFCircuitBreaker

type WAFCircuitBreaker struct {
	// contains filtered or unexported fields
}

WAFCircuitBreaker tracks per-job runs of consecutive WAF-flagged responses and trips a callback once the threshold is reached. The state lives in process memory — fine for the current single-pod worker deployment (fly.worker.toml). If the worker ever scales out horizontally the counter would undercount across replicas; in that case migrate it to Redis (INCR job:<id>:waf_consecutive with TTL) alongside the existing running-counters HASH.

func NewWAFCircuitBreaker

func NewWAFCircuitBreaker() *WAFCircuitBreaker

func (*WAFCircuitBreaker) Forget

func (b *WAFCircuitBreaker) Forget(jobID string)

Forget drops all per-job state. Called from OnJobTerminated so a long-running worker doesn't accumulate per-job map entries.

func (*WAFCircuitBreaker) MaybeTripFromOutcome

func (b *WAFCircuitBreaker) MaybeTripFromOutcome(ctx context.Context, jm JobManagerInterface, outcome *TaskOutcome)

MaybeTripFromOutcome is the convenience wrapper used from the stream worker hot path. It pulls the WAF detection from the outcome, calls Observe, and on a trip dispatches BlockJob in a detached goroutine with a bounded timeout. If the dispatch fails the breaker is re-armed for that job so a subsequent WAF response can retry.

The dispatch is asynchronous so the stream worker isn't held up by terminal-state DB lock contention — the per-task ACK / counter decrement / batch enqueue must not stall behind a multi-statement terminal transaction.

func (*WAFCircuitBreaker) Observe

func (b *WAFCircuitBreaker) Observe(jobID string, det *crawler.WAFDetection) (tripped bool, vendor crawler.WAFDetection)

Observe records the WAF status of a single task outcome. When det is nil or det.Blocked is false the per-job counter resets, ensuring the breaker only fires on truly consecutive blocks rather than a sparse sprinkle. Returns true exactly once per job — the moment the threshold is first crossed — so callers can fire BlockJob without guarding against duplicates.

func (*WAFCircuitBreaker) Rearm

func (b *WAFCircuitBreaker) Rearm(jobID string, lastVendor crawler.WAFDetection)

Rearm clears the single-fire tripped flag for a job AND seeds the consecutive-WAF counter to threshold-1 (with the previous trip's vendor preserved) so a single subsequent blocked outcome immediately retrips. Called when the dispatched BlockJob couldn't land — at that point we've already proven the domain is consistently walling us; making the retry re-establish the full streak would waste N-1 blocked observations. The counter still resets on any non-blocked response (Observe), so a site that recovers between attempts still gets a clean slate.

func (*WAFCircuitBreaker) Threshold

func (b *WAFCircuitBreaker) Threshold() int

Threshold exposes the configured trip count for telemetry/logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL