rqueue

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const QueueMaintenance = "maintenance"

QueueMaintenance is the queue name for maintenance jobs like deletion.

Variables

ValidJobStates are the valid states for filtering jobs.

Functions

func CreateRiverUIHandler

func CreateRiverUIHandler(_ context.Context, client *Client) (*riverui.Handler, error)

Types

type CleanupArgs

type CleanupArgs struct{}

CleanupArgs is a periodic job that cleans up expired sessions, stale rate limits, and old scrape results.

func (CleanupArgs) InsertOpts

func (CleanupArgs) InsertOpts() river.InsertOpts

func (CleanupArgs) Kind

func (CleanupArgs) Kind() string

type CleanupWorker

type CleanupWorker struct {
	river.WorkerDefaults[CleanupArgs]
	// contains filtered or unexported fields
}

CleanupWorker performs periodic database cleanup.

func (*CleanupWorker) Work

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(dbPool *pgxpool.Pool, encryptionKey []byte) (*Client, error)

NewClient creates a new Client for the API server. It processes maintenance jobs (worker provisioning/deletion) and can insert scrape jobs. encryptionKey is needed to decrypt secrets from app_config at job execution time.

func NewWorkerClient

func NewWorkerClient(dbPool *pgxpool.Pool, manager ScrapeManager) (*Client, error)

NewWorkerClient creates a new Client for worker mode. This client only processes scrape jobs. Maintenance jobs (worker provisioning, deletion) are handled by the server's River client. Worker-mode concurrency is intentionally fixed to one River worker per process.

func (*Client) DeleteJob

func (c *Client) DeleteJob(ctx context.Context, encodedJobID string) error

DeleteJob queues a background job to delete a scrape job and its results. Returns immediately after validation; actual deletion happens async.

func (*Client) GetDashboardStats

func (c *Client) GetDashboardStats(ctx context.Context) (*DashboardStats, error)

GetDashboardStats fetches job and result counts for the dashboard.

func (*Client) GetJobResults

func (c *Client) GetJobResults(ctx context.Context, encodedJobID string) (json.RawMessage, string, error)

GetJobResults fetches the raw JSON results and keyword for a job by its encoded ID.

func (*Client) GetJobStatus

func (c *Client) GetJobStatus(ctx context.Context, jobID string) (*JobStatus, error)

func (*Client) InsertJob

func (c *Client) InsertJob(ctx context.Context, args ScrapeJobArgs) (string, error)

func (*Client) InsertWorkerDeleteJob

func (c *Client) InsertWorkerDeleteJob(ctx context.Context, args WorkerDeleteArgs) error

func (*Client) InsertWorkerProvisionJob

func (c *Client) InsertWorkerProvisionJob(ctx context.Context, args WorkerProvisionArgs) error

InsertWorkerProvisionJob queues a background job to provision a cloud worker.

func (*Client) ListJobs

func (c *Client) ListJobs(ctx context.Context, state string, limit int, cursor string) (*JobListResult, error)

ListJobs returns a paginated list of jobs with optional state filtering.

func (*Client) RiverClient

func (c *Client) RiverClient() *river.Client[pgx.Tx]

RiverClient returns the underlying River client for use with River UI.

func (*Client) Start

func (c *Client) Start(ctx context.Context) error

func (*Client) StartRetryPromoter

func (c *Client) StartRetryPromoter(ctx context.Context)

StartRetryPromoter runs a background goroutine that periodically promotes retryable scrape jobs for immediate retry.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

type DashboardStats

type DashboardStats struct {
	JobsToday    int
	TotalResults int
}

InsertWorkerDeleteJob queues a background job to delete a cloud worker. DashboardStats holds summary statistics for the admin dashboard.

type JobDeleteArgs

type JobDeleteArgs struct {
	JobID int64 `json:"job_id"`
}

JobDeleteArgs contains the arguments for deleting a scrape job.

func (JobDeleteArgs) InsertOpts

func (JobDeleteArgs) InsertOpts() river.InsertOpts

InsertOpts returns the insert options for this job type.

func (JobDeleteArgs) Kind

func (JobDeleteArgs) Kind() string

Kind returns the job type identifier.

type JobDeleteWorker

type JobDeleteWorker struct {
	river.WorkerDefaults[JobDeleteArgs]
	// contains filtered or unexported fields
}

JobDeleteWorker handles job deletion in the background.

func (*JobDeleteWorker) Work

Work deletes the job and its associated results.

type JobListItem

type JobListItem struct {
	JobID       string
	Status      string
	Keyword     string
	CreatedAt   time.Time
	StartedAt   *time.Time
	CompletedAt *time.Time
	ResultCount int
	Error       string
}

JobListItem represents a job in the list (without full results).

type JobListResult

type JobListResult struct {
	Jobs       []JobListItem
	NextCursor string
	HasMore    bool
}

JobListResult represents the result of listing jobs.

type JobStatus

type JobStatus struct {
	JobID       string
	Status      string
	Keyword     string
	CreatedAt   time.Time
	StartedAt   *time.Time
	CompletedAt *time.Time
	Results     json.RawMessage
	Error       string
	ResultCount int
}

type ScrapeJobArgs

type ScrapeJobArgs struct {
	Keyword        string  `json:"keyword"`
	Lang           string  `json:"lang"`
	MaxDepth       int     `json:"max_depth"`
	Email          bool    `json:"email"`
	GeoCoordinates string  `json:"geo_coordinates"`
	Zoom           int     `json:"zoom"`
	Radius         float64 `json:"radius"`
	FastMode       bool    `json:"fast_mode"`
	ExtraReviews   bool    `json:"extra_reviews"`
	TimeoutSecs    int     `json:"timeout"` // timeout in seconds
}

func (ScrapeJobArgs) InsertOpts

func (ScrapeJobArgs) InsertOpts() river.InsertOpts

func (ScrapeJobArgs) Kind

func (ScrapeJobArgs) Kind() string

type ScrapeManager

type ScrapeManager interface {
	JobDone()
	SubmitJob(ctx context.Context, job scrapemate.IJob) error
	RegisterJob(jobID string, riverJobID int64, keyword string) <-chan scraper.FlushResult
	MarkDone(jobID string)
	ForceFlush(jobID string)
}

ScrapeManager is the interface that ScrapeWorker uses to interact with the scraper lifecycle and result collection.

type ScrapeWatchdogMetrics

type ScrapeWatchdogMetrics struct {
	FlushWaitWarnTotal   int64
	LongRuntimeWarnTotal int64
}

func GetScrapeWatchdogMetrics

func GetScrapeWatchdogMetrics() ScrapeWatchdogMetrics

GetScrapeWatchdogMetrics returns cumulative watchdog counters for scrape jobs.

type ScrapeWorker

type ScrapeWorker struct {
	river.WorkerDefaults[ScrapeJobArgs]
	Manager ScrapeManager
}

func (*ScrapeWorker) NextRetryAt

func (w *ScrapeWorker) NextRetryAt(_ *river.Job[ScrapeJobArgs]) time.Time

NextRetryAt returns a 2-minute fallback retry delay. In practice, the retry promoter goroutine periodically calls JobRetry() much sooner.

func (*ScrapeWorker) Timeout

func (w *ScrapeWorker) Timeout(job *river.Job[ScrapeJobArgs]) time.Duration

Timeout controls River's job context deadline for scrape jobs. Keep it above Work()'s internal timeout to allow force-flush and result save.

func (*ScrapeWorker) Work

func (w *ScrapeWorker) Work(ctx context.Context, job *river.Job[ScrapeJobArgs]) error

type WorkerDeleteArgs

type WorkerDeleteArgs struct {
	ResourceID         int    `json:"resource_id"`
	Provider           string `json:"provider"`
	ProviderResourceID string `json:"provider_resource_id"`
}

WorkerDeleteArgs contains only non-sensitive arguments for deletion.

func (WorkerDeleteArgs) InsertOpts

func (WorkerDeleteArgs) InsertOpts() river.InsertOpts

func (WorkerDeleteArgs) Kind

func (WorkerDeleteArgs) Kind() string

type WorkerDeleteWorker

type WorkerDeleteWorker struct {
	river.WorkerDefaults[WorkerDeleteArgs]
	// contains filtered or unexported fields
}

WorkerDeleteWorker handles worker deletion in the background.

func (*WorkerDeleteWorker) Work

type WorkerHealthCheckArgs

type WorkerHealthCheckArgs struct{}

WorkerHealthCheckArgs is a periodic job that checks health of all active workers.

func (WorkerHealthCheckArgs) InsertOpts

func (WorkerHealthCheckArgs) InsertOpts() river.InsertOpts

func (WorkerHealthCheckArgs) Kind

type WorkerHealthCheckWorker

type WorkerHealthCheckWorker struct {
	river.WorkerDefaults[WorkerHealthCheckArgs]
	// contains filtered or unexported fields
}

WorkerHealthCheckWorker checks health of all active provisioned workers.

func (*WorkerHealthCheckWorker) Work

type WorkerProvisionArgs

type WorkerProvisionArgs struct {
	ResourceID      int    `json:"resource_id"`
	Provider        string `json:"provider"`
	Name            string `json:"name"`
	Region          string `json:"region"`
	Size            string `json:"size"`
	Concurrency     int    `json:"concurrency"` // Number of worker containers on the host.
	MaxJobsPerCycle int    `json:"max_jobs_per_cycle"`
	FastMode        bool   `json:"fast_mode"`
	Proxies         string `json:"proxies"`
}

WorkerProvisionArgs contains only non-sensitive arguments for provisioning. Secrets (API tokens, DB URL, registry creds) are fetched at runtime from app_config.

func (WorkerProvisionArgs) InsertOpts

func (WorkerProvisionArgs) InsertOpts() river.InsertOpts

func (WorkerProvisionArgs) Kind

func (WorkerProvisionArgs) Kind() string

type WorkerProvisionWorker

type WorkerProvisionWorker struct {
	river.WorkerDefaults[WorkerProvisionArgs]
	// contains filtered or unexported fields
}

WorkerProvisionWorker handles worker provisioning in the background.

func (*WorkerProvisionWorker) Timeout

func (*WorkerProvisionWorker) Work

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL