Documentation
¶
Overview ¶
Package broker provides Redis-backed task scheduling, dispatch, and coordination primitives for the hover crawl execution pipeline.
Index ¶
- Constants
- func ConsumerGroup(jobID string) string
- func DomainConfigKey(domain string) string
- func DomainGateKey(domain string) string
- func DomainInflightKey(domain string) string
- func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, ...) string
- func LighthouseConsumerGroup(jobID string) string
- func LighthouseStreamKey(jobID string) string
- func ScheduleKey(jobID string) string
- func StreamKey(jobID string) string
- type BatchError
- type Client
- func (c *Client) ClearAll(ctx context.Context) (int, error)
- func (c *Client) Close() error
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PingWithRetry(ctx context.Context, total, perAttempt time.Duration) error
- func (c *Client) RDB() *redis.Client
- func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)
- func (c *Client) RemoveJobKeys(ctx context.Context, jobID string) error
- func (c *Client) SweepOrphanInflight(ctx context.Context, activeJobIDs []string) (int, error)
- type ConcurrencyChecker
- type Config
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, jobID string, messageIDs ...string) error
- func (c *Consumer) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (c *Consumer) Read(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReadNonBlocking(ctx context.Context, jobID string) ([]StreamMessage, error)
- func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
- type ConsumerOpts
- type DBSyncFunc
- type Dispatcher
- type DispatcherOpts
- type DomainPacer
- func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) EffectiveCap(ctx context.Context, domain string) (int, error)
- func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
- func (p *DomainPacer) GetDomainInflight(ctx context.Context, domain string) (int64, error)
- func (p *DomainPacer) GetInflight(ctx context.Context, domain, jobID string) (int64, error)
- func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
- func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) (int, error)
- func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
- func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
- type JobLister
- type OutboxSweeperOpts
- type PaceResult
- type PacerConfig
- type Probe
- type ProbeOpts
- type ReclaimReport
- type Reconciler
- type RunningCounters
- func (rc *RunningCounters) Decrement(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Get(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) GetAll(ctx context.Context) (map[string]int64, error)
- func (rc *RunningCounters) Increment(ctx context.Context, jobID string) (int64, error)
- func (rc *RunningCounters) Reconcile(ctx context.Context, counts map[string]int64) error
- func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
- func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
- type ScheduleEntry
- type Scheduler
- func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
- func (s *Scheduler) PendingCount(ctx context.Context, jobID string) (int64, error)
- func (s *Scheduler) Remove(ctx context.Context, jobID, member string) error
- func (s *Scheduler) RemoveJobSchedule(ctx context.Context, jobID string) error
- func (s *Scheduler) Reschedule(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
- func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
- func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
- func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
- type StreamMessage
- type Sweeper
- type TerminalFilter
Constants ¶
const DefaultOutboxMaxAttempts = 10
DefaultOutboxMaxAttempts caps the worst-case stuck-row age at MaxAttempts × MaxBackoff (10 × 5 min = 50 min at defaults).
const RunningCountersKey = keyPrefix + "running"
HASH: jobID → in-flight task count.
Variables ¶
This section is empty.
Functions ¶
func ConsumerGroup ¶
func DomainConfigKey ¶
HASH: base_delay_ms, adaptive_delay_ms, floor_ms, success_streak, error_streak.
func DomainGateKey ¶
String time-gate. SET NX PX {delay_ms} caps per-domain dispatch rate.
func DomainInflightKey ¶
HASH: jobID → inflight task count for this domain+job pair.
func FormatScheduleEntry ¶
func FormatScheduleEntry(taskID, jobID string, pageID int, host, path string, priority float64, retryCount int, sourceType, sourceURL, taskType string, lighthouseRunID int64) string
Pipe-delimited (avoids JSON overhead on the scheduling hot path).
Current: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL|taskType|lighthouseRunID Legacy: taskID|jobID|pageID|host|path|priority|retryCount|sourceType|sourceURL
ParseScheduleEntry accepts both so a deploy rolls forward without a ZSET flush; legacy entries default to taskType='crawl'.
func LighthouseConsumerGroup ¶
func LighthouseStreamKey ¶
Distinct stream so crawl workers and analysis can scale independently.
Types ¶
type BatchError ¶
BatchError is returned by ScheduleBatch on partial pipeline failure. Type-assert via errors.As to retry FailedIndices.
func (*BatchError) Error ¶
func (e *BatchError) Error() string
func (*BatchError) Unwrap ¶
func (e *BatchError) Unwrap() error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) ClearAll ¶
ClearAll deletes every hover:* key the broker writes. Does NOT FLUSHDB — safe on shared Redis.
func (*Client) PingWithRetry ¶ added in v0.34.13
PingWithRetry retries Ping with capped exponential backoff until the total budget elapses. Each attempt uses its own perAttempt timeout so a hung server can't stall the loop. Returns nil on first success or the last error if the budget is exhausted. Exists because Upstash-on-Fly review apps occasionally close the first PING with EOF during cold start, which used to Fatal the binary on boot (see HOVER-JX/MD/JZ).
func (*Client) ReclaimTerminalJobKeys ¶
func (c *Client) ReclaimTerminalJobKeys(ctx context.Context, filter TerminalFilter) (ReclaimReport, error)
One-off backfill sweeper. Idempotent.
func (*Client) RemoveJobKeys ¶
RemoveJobKeys clears all per-job broker state for a terminal job. XGroupDestroy errors are tolerated so a partially-cleaned or lighthouse-less job doesn't abort the rest of the cleanup.
func (*Client) SweepOrphanInflight ¶
SweepOrphanInflight removes dom:flight fields for jobs absent from activeJobIDs. Drift source: SIGKILL bypasses the graceful drain so dispatcher increments without a matching pacer.Release decrement. dom:flight has no dedicated reconciler.
type ConcurrencyChecker ¶
type Config ¶
type Config struct {
URL string
PoolSize int
TLSEnabled bool
ReadTimeout time.Duration
WriteTimeout time.Duration
MaxRetries int
}
func ConfigFromEnv ¶
func ConfigFromEnv() Config
ConfigFromEnv infers TLS from the URL scheme (rediss://) unless REDIS_TLS_ENABLED overrides.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads via XREADGROUP and reclaims stale messages via XAUTOCLAIM.
func NewConsumer ¶
func NewConsumer(client *Client, opts ConsumerOpts) *Consumer
func (*Consumer) PendingCount ¶
PendingCount returns the PEL size — the authoritative source of "currently running" for a job. RunningCounters mirrors this and can drift under partial failures.
func (*Consumer) Read ¶
Read blocks up to opts.BlockTimeout. Returns nil (not error) when no messages are ready.
func (*Consumer) ReadNonBlocking ¶
ReadNonBlocking returns immediately when no messages are ready.
func (*Consumer) ReclaimStale ¶
func (c *Consumer) ReclaimStale(ctx context.Context, jobID string) (reclaimed []StreamMessage, deadLetter []StreamMessage, err error)
ReclaimStale walks the XAUTOCLAIM cursor until "0-0" so a burst of stuck messages drains in one tick. Messages over MaxDeliveries are returned as deadLetter candidates — caller owns final disposition and must ACK/NACK or they'll be reclaimed again next sweep.
type ConsumerOpts ¶
type ConsumerOpts struct {
// ConsumerName: "worker-{machineID}-{goroutineID}".
ConsumerName string
BlockTimeout time.Duration
Count int64
ClaimInterval time.Duration
MinIdleTime time.Duration
MaxDeliveries int64
// AutoclaimCount is the per-call XAUTOCLAIM COUNT.
AutoclaimCount int64
// AutoclaimMaxPerSweep caps total reclaimed per sweep so one
// pathological job can't starve the rest of the reclaim loop.
AutoclaimMaxPerSweep int
}
func DefaultConsumerOpts ¶
func DefaultConsumerOpts(consumerName string) ConsumerOpts
type DBSyncFunc ¶
func DefaultDBSyncFunc ¶
func DefaultDBSyncFunc(sqlDB *sql.DB) DBSyncFunc
No outer tx: wrapping per-job UPDATEs together held row locks that deadlocked with the update_job_queue_counters AFTER trigger and saturated the bulk DB pool. Skew metric loses tx-snapshot consistency but Redis/PG already drift between ticks.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher moves due items from per-job Redis ZSETs into per-job Redis Streams.
func NewDispatcher ¶
func NewDispatcher( client *Client, scheduler *Scheduler, pacer *DomainPacer, counters *RunningCounters, jobLister JobLister, concCheck ConcurrencyChecker, opts DispatcherOpts, ) *Dispatcher
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context)
Run blocks until ctx is cancelled. Start as a goroutine.
func (*Dispatcher) SetOnFirstDispatch ¶
func (d *Dispatcher) SetOnFirstDispatch(fn func(ctx context.Context, jobID string) error)
SetOnFirstDispatch installs an idempotent hook fired the first time dispatchJob publishes a task for a jobID. A non-nil return triggers retry on the next dispatch.
func (*Dispatcher) SetReconciler ¶
func (d *Dispatcher) SetReconciler(r Reconciler)
SetReconciler installs the self-heal target. Nil disables self-heal and is tolerated throughout the hot path.
type DispatcherOpts ¶
type DispatcherOpts struct {
ScanInterval time.Duration
BatchSize int64
// ParallelJobs caps per-tick dispatch goroutines. Serial dispatch
// scaled O(N_jobs × batch) and produced a ~70× backlog under 100
// jobs. Default 32; override via REDIS_DISPATCH_PARALLEL_JOBS.
ParallelJobs int
// StuckThreshold gates the self-heal reconcile. Default 30s, env
// REDIS_DISPATCH_STUCK_THRESHOLD_S; rate-limited to one trigger
// per 2× threshold per job.
StuckThreshold time.Duration
}
DispatcherOpts controls the dispatcher's scan behaviour.
func DefaultDispatcherOpts ¶
func DefaultDispatcherOpts() DispatcherOpts
type DomainPacer ¶
type DomainPacer struct {
// contains filtered or unexported fields
}
func NewDomainPacer ¶
func NewDomainPacer(client *Client, cfg PacerConfig) *DomainPacer
func (*DomainPacer) DecrementInflight ¶
func (p *DomainPacer) DecrementInflight(ctx context.Context, domain, jobID string) error
func (*DomainPacer) EffectiveCap ¶ added in v0.34.10
EffectiveCap returns the maximum useful per-domain inflight count given the learned adaptive delay and the configured response-time estimate. Returns 0 when no cap applies (adaptive_delay_ms <= 0): the dispatcher then falls back to the per-job concurrency check alone.
func (*DomainPacer) FlushAdaptiveDelays ¶
func (p *DomainPacer) FlushAdaptiveDelays(ctx context.Context) (int, error)
Restores pre-merge behaviour: in-memory limiter reset on each worker restart, but the Redis-backed state has a 24h TTL so a single 429 spike can pin a domain at the 60s floor for a full day. Call on worker startup to wipe the slate.
func (*DomainPacer) GetDomainInflight ¶ added in v0.34.10
GetDomainInflight returns the sum of inflight counts across all jobs targeting this domain. The per-domain cap must compare against this total rather than any single job's slot, otherwise N concurrent jobs against the same host each get to dispatch their own copy of the cap and the burst-prevention path is defeated.
func (*DomainPacer) GetInflight ¶
func (*DomainPacer) IncrementInflight ¶
func (p *DomainPacer) IncrementInflight(ctx context.Context, domain, jobID string) error
func (*DomainPacer) Release ¶
func (p *DomainPacer) Release(ctx context.Context, domain, jobID string, success, rateLimited bool) (int, error)
Release decrements inflight and updates adaptive_delay_ms. Returns the post-update adaptive delay in ms (or -1 if Release did not touch it, e.g. neither success nor rateLimited). DecrementInflight runs unconditionally: if the adaptive update fails we still need to release the inflight slot because the worker has already ACKed the message, otherwise the domain inflight count drifts upward forever and the per-domain cap starts refusing dispatches for work that is no longer running.
func (*DomainPacer) Seed ¶
func (p *DomainPacer) Seed(ctx context.Context, domain string, baseDelayMS, adaptiveDelayMS, floorMS int) error
HSETNX preserves existing values, so callers may re-seed safely.
func (*DomainPacer) TryAcquire ¶
func (p *DomainPacer) TryAcquire(ctx context.Context, domain string) (PaceResult, error)
Single Lua EVALSHA — the prior three-call form (HMGET → SET NX PX → PTTL) was the dominant dispatcher round-trip cost under multi-job loads.
type OutboxSweeperOpts ¶
type OutboxSweeperOpts struct {
Interval time.Duration
BatchSize int
BaseBackoff time.Duration
MaxBackoff time.Duration
MaxAttempts int
// StatementTimeout bounds tick DB work; guards against a wedged
// sweeper tx holding locks indefinitely. 0 keeps DB default.
StatementTimeout time.Duration
}
func DefaultOutboxSweeperOpts ¶
func DefaultOutboxSweeperOpts() OutboxSweeperOpts
DefaultOutboxSweeperOpts: 500ms interval (5s starved end-to-end latency on small jobs); 15s StatementTimeout (HOVER-K3 — pool acquire ate several seconds of tick budget under bulk-lane load).
type PaceResult ¶
type PacerConfig ¶
type PacerConfig struct {
SuccessThreshold int
DelayStepMS int
// Defaults to DelayStepMS. Higher = faster recovery than growth, so
// a 429 spike doesn't throttle a domain for 20 minutes.
DelayStepDownMS int
MaxDelayMS int
// Floor on RetryAfter so a near-zero gate TTL doesn't tight-loop
// the dispatcher (Dispatcher tick is 100ms).
MinPushbackMS int
// EstResponseMS bounds the per-domain inflight cap: when
// adaptive_delay_ms is non-zero, useful concurrency is
// ceil(EstResponseMS / adaptive_delay_ms). Holding a fixed 20-wide
// burst against a CF-fronted domain elevates Fly's egress score; the
// cap collapses the burst to whatever the learned rate can sustain.
EstResponseMS int
}
func DefaultPacerConfig ¶
func DefaultPacerConfig() PacerConfig
type Probe ¶
type Probe struct {
// contains filtered or unexported fields
}
type ProbeOpts ¶
type ProbeOpts struct {
Interval time.Duration
// Bounds a single tick so a slow Redis/DB call can't stall the loop.
TickTimeout time.Duration
}
func DefaultProbeOpts ¶
func DefaultProbeOpts() ProbeOpts
type ReclaimReport ¶
type Reconciler ¶
Reconciler is the dispatcher's self-heal target when CanDispatch keeps refusing dispatch despite due ZSET work — the signature of `hover:running` counter drift. Implementations must be safe for concurrent invocation and should debounce a flood of triggers to at most one in-flight reconcile.
type RunningCounters ¶
type RunningCounters struct {
// contains filtered or unexported fields
}
func NewRunningCounters ¶
func NewRunningCounters(client *Client) *RunningCounters
func (*RunningCounters) RemoveJob ¶
func (rc *RunningCounters) RemoveJob(ctx context.Context, jobID string) error
func (*RunningCounters) StartDBSync ¶
func (rc *RunningCounters) StartDBSync(ctx context.Context, interval time.Duration, syncFn DBSyncFunc)
type ScheduleEntry ¶
type ScheduleEntry struct {
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
RunAt time.Time
TaskType string
LighthouseRunID int64
}
ScheduleEntry is encoded as a pipe-delimited ZSET member with the run-at unix-ms as the score. TaskType routes to a stream: "crawl" → StreamKey, "lighthouse" → LighthouseStreamKey.
func ParseScheduleEntry ¶
func ParseScheduleEntry(member string, score float64) (ScheduleEntry, error)
ParseScheduleEntry accepts both the 9-field legacy format and the 11-field current format so a rolling deploy drains without a flush.
func (ScheduleEntry) Member ¶
func (e ScheduleEntry) Member() string
func (ScheduleEntry) Score ¶
func (e ScheduleEntry) Score() float64
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages delayed scheduling via Redis sorted sets. NewSchedulerWithDB enables dual-write of Reschedule's run-at into tasks.run_at so pacing push-backs survive a Redis flush.
func NewScheduler ¶
NewScheduler creates a Scheduler without Postgres mirroring.
func (*Scheduler) DueItems ¶
func (s *Scheduler) DueItems(ctx context.Context, jobID string, now time.Time, limit int64) ([]ScheduleEntry, error)
DueItems returns up to limit entries with score ≤ now. Items are not removed — caller ZREMs after successful dispatch.
func (*Scheduler) PendingCount ¶
func (*Scheduler) RemoveJobSchedule ¶
func (*Scheduler) Reschedule ¶
Reschedule pushes an existing entry's run-at later. With *sql.DB configured it dual-writes Postgres first so a crash between writes leaves the durable store with the newer time.
TODO(run-at-reconcile): once tasks have a dedicated 'scheduled' status, add a startup sweep that re-seeds ZSET from tasks.run_at for scheduled rows missing a ZSET member.
func (*Scheduler) RescheduleZSet ¶
func (s *Scheduler) RescheduleZSet(ctx context.Context, entry ScheduleEntry, newRunAt time.Time) error
RescheduleZSet is Reschedule without the Postgres mirror, for the hot pacer-pushback path. The synchronous UPDATE backed up the ZSET to 80k+ entries at 100 paced ops/sec on 2026-04-22. Safe because OutboxSweeper rehydrates from tasks.run_at if Redis loses state.
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(ctx context.Context, entry ScheduleEntry) error
func (*Scheduler) ScheduleAndAck ¶
func (s *Scheduler) ScheduleAndAck(ctx context.Context, entry ScheduleEntry, ackJobID, messageID string) error
ScheduleAndAck atomically enqueues the retry and ACKs the original in one MULTI/EXEC. Two-step would let XAUTOCLAIM redeliver a stuck PEL entry and double-crawl if Ack failed after Schedule succeeded.
func (*Scheduler) ScheduleBatch ¶
func (s *Scheduler) ScheduleBatch(ctx context.Context, entries []ScheduleEntry) error
ScheduleBatch returns *BatchError when the pipeline ran but individual ZADDs failed; a non-BatchError means the pipeline itself failed and all entries must be treated as failed.
type StreamMessage ¶
type StreamMessage struct {
MessageID string
TaskID string
JobID string
PageID int
Host string
Path string
Priority float64
RetryCount int
SourceType string
SourceURL string
}
StreamMessage is a parsed task envelope read from a Redis Stream.
type Sweeper ¶
type Sweeper struct {
// contains filtered or unexported fields
}
Sweeper drains task_outbox into Redis via Scheduler.ScheduleBatch. Multi-replica safe: FOR UPDATE SKIP LOCKED partitions due rows across sweepers.
func NewOutboxSweeper ¶
func NewOutboxSweeper(db *sql.DB, scheduler *Scheduler, opts OutboxSweeperOpts) *Sweeper