broker

package
v0.34.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.

Index

Constants

View Source
const DefaultOutboxMaxAttempts = 10

DefaultOutboxMaxAttempts caps the worst-case stuck-row age at MaxAttempts × MaxBackoff (10 × 5 min = 50 min at defaults).

View Source
const RunningCountersKey = keyPrefix + "running"

HASH: jobID → in-flight task count.

Variables

This section is empty.

Functions

func ConsumerGroup

func ConsumerGroup(jobID string) string

func DomainConfigKey

func DomainConfigKey(domain string) string

HASH: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.

func DomainGateKey

func DomainGateKey(domain string) string

String time-gate. SET NX PX {delay_ms} caps per-domain dispatch rate.

func DomainInflightKey

func DomainInflightKey(domain string) string

HASH: jobID → inflight task count for this domain+job pair.

func FormatScheduleEntry

func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL, taskType string, lighthouseRunID int64) string

Pipe-delimited (avoids JSON overhead on the scheduling hot path).

Current: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL|taskType|lighthouseRunID Legacy: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL

ParseScheduleEntry accepts both so a deploy rolls forward without a ZSET flush; legacy entries default to taskType='crawl'.

func LighthouseConsumerGroup

func LighthouseConsumerGroup(jobID string) string

func LighthouseStreamKey

func LighthouseStreamKey(jobID string) string

Distinct stream so crawl workers and analysis can scale independently.

func ScheduleKey

func ScheduleKey(jobID string) string

ZSET; score = earliest runnable unix-ms.

func StreamKey

func StreamKey(jobID string) string

Types

type BatchError

type BatchError struct {
	FailedIndices []int
	Total         int
	Err           error
}

BatchError is returned by ScheduleBatch on partial pipeline failure. Type-assert via errors.As to retry FailedIndices.

func (*BatchError) Error

func (e *BatchError) Error() string

func (*BatchError) Unwrap

func (e *BatchError) Unwrap() error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg Config) (*Client, error)

func (*Client) ClearAll

func (c *Client) ClearAll(ctx context.Context) (int, error)

ClearAll deletes every hover:* key the broker writes. Does NOT FLUSHDB — safe on shared Redis.

func (*Client) Close

func (c *Client) Close() error

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

func (*Client) RDB

func (c *Client) RDB() *redis.Client

func (*Client) ReclaimTerminalJobKeys

func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)

One-off backfill sweeper. Idempotent.

func (*Client) RemoveJobKeys

func (c *Client) RemoveJobKeys(ctx context.Context, jobID string) error

RemoveJobKeys clears all per-job broker state for a terminal job. XGroupDestroy errors are tolerated so a partially-cleaned or lighthouse-less job doesn't abort the rest of the cleanup.

func (*Client) SweepOrphanInflight

func (c *Client) SweepOrphanInflight(ctx context.Context, activeJobIDs []string) (int, error)

SweepOrphanInflight removes dom:flight fields for jobs absent from activeJobIDs. Drift source: SIGKILL bypasses the graceful drain so dispatcher increments without a matching pacer.Release decrement. dom:flight has no dedicated reconciler.

type ConcurrencyChecker

type ConcurrencyChecker interface {
	CanDispatch(ctx context.Context, jobID string) (bool, error)
}

type Config

type Config struct {
	URL          string
	PoolSize     int
	TLSEnabled   bool
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	MaxRetries   int
}

func ConfigFromEnv

func ConfigFromEnv() Config

ConfigFromEnv infers TLS from the URL scheme (rediss://) unless REDIS_TLS_ENABLED overrides.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads via XREADGROUP and reclaims stale messages via XAUTOCLAIM.

func NewConsumer

func NewConsumer(client *Client, opts ConsumerOpts) *Consumer

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error

func (*Consumer) PendingCount

func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)

PendingCount returns the PEL size — the authoritative source of "currently running" for a job. RunningCounters mirrors this and can drift under partial failures.

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)

Read blocks up to opts.BlockTimeout. Returns nil (not error) when no messages are ready.

func (*Consumer) ReadNonBlocking

func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)

ReadNonBlocking returns immediately when no messages are ready.

func (*Consumer) ReclaimStale

func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)

ReclaimStale walks the XAUTOCLAIM cursor until "0-0" so a burst of stuck messages drains in one tick. Messages over MaxDeliveries are returned as deadLetter candidates — caller owns final disposition and must ACK/NACK or they'll be reclaimed again next sweep.

type ConsumerOpts

type ConsumerOpts struct {
	// ConsumerName: "worker-{machineID}-{goroutineID}".
	ConsumerName  string
	BlockTimeout  time.Duration
	Count         int64
	ClaimInterval time.Duration
	MinIdleTime   time.Duration
	MaxDeliveries int64
	// AutoclaimCount is the per-call XAUTOCLAIM COUNT.
	AutoclaimCount int64
	// AutoclaimMaxPerSweep caps total reclaimed per sweep so one
	// pathological job can't starve the rest of the reclaim loop.
	AutoclaimMaxPerSweep int
}

func DefaultConsumerOpts

func DefaultConsumerOpts(consumerName string) ConsumerOpts

type DBSyncFunc

type DBSyncFunc func(ctx context.Context, counts map[string]int64) error

func DefaultDBSyncFunc

func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc

No outer tx: wrapping per-job UPDATEs together held row locks that deadlocked with the update_job_queue_counters AFTER trigger and saturated the bulk DB pool. Skew metric loses tx-snapshot consistency but Redis/PG already drift between ticks.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher moves due items from per-job Redis ZSETs into per-job Redis Streams.

func NewDispatcher

func NewDispatcher(
	client *Client,
	scheduler *Scheduler,
	pacer *DomainPacer,
	counters *RunningCounters,
	jobLister JobLister,
	concCheck ConcurrencyChecker,
	opts DispatcherOpts,
) *Dispatcher

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run blocks until ctx is cancelled. Start as a goroutine.

func (*Dispatcher) SetOnFirstDispatch

func (d *Dispatcher) SetOnFirstDispatch(fn func(ctx context.Context, jobID string) error)

SetOnFirstDispatch installs an idempotent hook fired the first time dispatchJob publishes a task for a jobID. A non-nil return triggers retry on the next dispatch.

func (*Dispatcher) SetReconciler

func (d *Dispatcher) SetReconciler(r Reconciler)

SetReconciler installs the self-heal target. Nil disables self-heal and is tolerated throughout the hot path.

type DispatcherOpts

type DispatcherOpts struct {
	ScanInterval time.Duration
	BatchSize    int64

	// ParallelJobs caps per-tick dispatch goroutines. Serial dispatch
	// scaled O(N_jobs × batch) and produced a ~70× backlog under 100
	// jobs. Default 32; override via REDIS_DISPATCH_PARALLEL_JOBS.
	ParallelJobs int

	// StuckThreshold gates the self-heal reconcile. Default 30s, env
	// REDIS_DISPATCH_STUCK_THRESHOLD_S; rate-limited to one trigger
	// per 2× threshold per job.
	StuckThreshold time.Duration
}

DispatcherOpts controls the dispatcher's scan behaviour.

func DefaultDispatcherOpts

func DefaultDispatcherOpts() DispatcherOpts

type DomainPacer

type DomainPacer struct {
	// contains filtered or unexported fields
}

func NewDomainPacer

func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer

func (*DomainPacer) DecrementInflight

func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error

func (*DomainPacer) FlushAdaptiveDelays

func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)

Restores pre-merge behaviour: in-memory limiter reset on each worker restart, but the Redis-backed state has a 24h TTL so a single 429 spike can pin a domain at the 60s floor for a full day. Call on worker startup to wipe the slate.

func (*DomainPacer) GetInflight

func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)

func (*DomainPacer) IncrementInflight

func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error

func (*DomainPacer) Release

func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) error

func (*DomainPacer) Seed

func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error

HSETNX preserves existing values, so callers may re-seed safely.

func (*DomainPacer) TryAcquire

func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)

Single Lua EVALSHA — the prior three-call form (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job loads.

type JobLister

type JobLister interface {
	ActiveJobIDs(ctx context.Context) ([]string, error)
}

type OutboxSweeperOpts

type OutboxSweeperOpts struct {
	Interval    time.Duration
	BatchSize   int
	BaseBackoff time.Duration
	MaxBackoff  time.Duration
	MaxAttempts int
	// StatementTimeout bounds tick DB work; guards against a wedged
	// sweeper tx holding locks indefinitely. 0 keeps DB default.
	StatementTimeout time.Duration
}

func DefaultOutboxSweeperOpts

func DefaultOutboxSweeperOpts() OutboxSweeperOpts

DefaultOutboxSweeperOpts: 500ms interval (5s starved end-to-end latency on small jobs); 15s StatementTimeout (HOVER-K3 — pool acquire ate several seconds of tick budget under bulk-lane load).

type PaceResult

type PaceResult struct {
	Acquired bool
	// Only meaningful when Acquired is false.
	RetryAfter time.Duration
}

type PacerConfig

type PacerConfig struct {
	SuccessThreshold int
	DelayStepMS      int
	// Defaults to DelayStepMS. Higher = faster recovery than growth, so
	// a 429 spike doesn't throttle a domain for 20 minutes.
	DelayStepDownMS int
	MaxDelayMS      int
	// Floor on RetryAfter so a near-zero gate TTL doesn't tight-loop
	// the dispatcher (Dispatcher tick is 100ms).
	MinPushbackMS int
}

func DefaultPacerConfig

func DefaultPacerConfig() PacerConfig

type Probe

type Probe struct {
	// contains filtered or unexported fields
}

func NewProbe

func NewProbe(client *Client, db *sql.DB, lister JobLister, opts ProbeOpts) *Probe

db may be nil on the API side. Zero opts fields fall back to defaults.

func (*Probe) Run

func (p *Probe) Run(ctx context.Context)

type ProbeOpts

type ProbeOpts struct {
	Interval time.Duration
	// Bounds a single tick so a slow Redis/DB call can't stall the loop.
	TickTimeout time.Duration
}

func DefaultProbeOpts

func DefaultProbeOpts() ProbeOpts

type ReclaimReport

type ReclaimReport struct {
	CandidatesScanned int
	TerminalJobs      int
	Cleaned           int
	Failed            int
	// First failure only, to avoid retaining one error per failed job.
	FirstError error
}

type Reconciler

type Reconciler interface {
	TriggerReconcile(ctx context.Context)
}

Reconciler is the dispatcher's self-heal target when CanDispatch keeps refusing dispatch despite due ZSET work — the signature of `hover:running` counter drift. Implementations must be safe for concurrent invocation and should debounce a flood of triggers to at most one in-flight reconcile.

type RunningCounters

type RunningCounters struct {
	// contains filtered or unexported fields
}

func NewRunningCounters

func NewRunningCounters(client *Client) *RunningCounters

func (*RunningCounters) Decrement

func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) Get

func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) GetAll

func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)

func (*RunningCounters) Increment

func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)

func (*RunningCounters) Reconcile

func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error

func (*RunningCounters) RemoveJob

func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error

func (*RunningCounters) StartDBSync

func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)

type ScheduleEntry

type ScheduleEntry struct {
	TaskID          string
	JobID           string
	PageID          int
	Host            string
	Path            string
	Priority        float64
	RetryCount      int
	SourceType      string
	SourceURL       string
	RunAt           time.Time
	TaskType        string
	LighthouseRunID int64
}

ScheduleEntry is encoded as a pipe-delimited ZSET member with the run-at unix-ms as the score. TaskType routes to a stream: "crawl" → StreamKey, "lighthouse" → LighthouseStreamKey.

func ParseScheduleEntry

func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)

ParseScheduleEntry accepts both the 9-field legacy format and the 11-field current format so a rolling deploy drains without a flush.

func (ScheduleEntry) Member

func (e ScheduleEntry) Member() string

func (ScheduleEntry) Score

func (e ScheduleEntry) Score() float64

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages delayed scheduling via Redis sorted sets. NewSchedulerWithDB enables dual-write of Reschedule's run-at into tasks.run_at so pacing push-backs survive a Redis flush.

func NewScheduler

func NewScheduler(client *Client) *Scheduler

NewScheduler creates a Scheduler without Postgres mirroring.

func NewSchedulerWithDB

func NewSchedulerWithDB(client *Client, db *sql.DB) *Scheduler

func (*Scheduler) DueItems

func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)

DueItems returns up to limit entries with score ≤ now. Items are not removed — caller ZREMs after successful dispatch.

func (*Scheduler) PendingCount

func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)

func (*Scheduler) Remove

func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error

func (*Scheduler) RemoveJobSchedule

func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error

func (*Scheduler) Reschedule

func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

Reschedule pushes an existing entry's run-at later. With *sql.DB configured it dual-writes Postgres first so a crash between writes leaves the durable store with the newer time.

TODO(run-at-reconcile): once tasks have a dedicated 'scheduled' status, add a startup sweep that re-seeds ZSET from tasks.run_at for scheduled rows missing a ZSET member.

func (*Scheduler) RescheduleZSet

func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error

RescheduleZSet is Reschedule without the Postgres mirror, for the hot pacer-pushback path. The synchronous UPDATE backed up the ZSET to 80k+ entries at 100 paced ops/sec on 2026-04-22. Safe because OutboxSweeper rehydrates from tasks.run_at if Redis loses state.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error

func (*Scheduler) ScheduleAndAck

func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error

ScheduleAndAck atomically enqueues the retry and ACKs the original in one MULTI/EXEC. Two-step would let XAUTOCLAIM redeliver a stuck PEL entry and double-crawl if Ack failed after Schedule succeeded.

func (*Scheduler) ScheduleBatch

func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error

ScheduleBatch returns *BatchError when the pipeline ran but individual ZADDs failed; a non-BatchError means the pipeline itself failed and all entries must be treated as failed.

type StreamMessage

type StreamMessage struct {
	MessageID  string
	TaskID     string
	JobID      string
	PageID     int
	Host       string
	Path       string
	Priority   float64
	RetryCount int
	SourceType string
	SourceURL  string
}

StreamMessage is a parsed task envelope read from a Redis Stream.

type Sweeper

type Sweeper struct {
	// contains filtered or unexported fields
}

Sweeper drains task_outbox into Redis via Scheduler.ScheduleBatch. Multi-replica safe: FOR UPDATE SKIP LOCKED partitions due rows across sweepers.

func NewOutboxSweeper

func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper

func (*Sweeper) Run

func (s *Sweeper) Run(ctx context.Context)

Run drives the sweeper loop until ctx is cancelled.

func (*Sweeper) Tick

func (s *Sweeper) Tick(ctx context.Context) error

Tick runs a single sweep iteration. Exported for tests.

type TerminalFilter

type TerminalFilter func(ctx context.Context, jobIDs []string) ([]string, error)

Returns the subset of jobIDs that are terminal in Postgres. Kept as an interface so this package stays free of SQL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL