Documentation
¶
Overview ¶
Package worker is the background job queue: the enqueue API job producers submit through, and (from a later milestone) the claim-and-run loop that drains it. M3 introduces only the enqueue seam, which the post-receive push sink uses to record push events and search reindexes; the run loop and the per-kind handlers land with the milestones that consume each job kind.
Index ¶
Constants ¶
const DeliveryRetention = 30 * 24 * time.Hour
DeliveryRetention is how long a webhook delivery record is kept. Thirty days matches GitHub's own delivery log window; a record carries the full request and response bodies, so keeping them forever grows the table without bound.
Variables ¶
This section is empty.
Functions ¶
func RunDeliveryRetention ¶ added in v0.1.3
RunDeliveryRetention deletes webhook deliveries older than DeliveryRetention, sweeping once immediately and then every retentionSweepInterval until ctx is canceled. The server starts it as a background goroutine alongside the job runtime.
Types ¶
type CodeReindexer ¶ added in v0.1.3
CodeReindexer rebuilds a repository's code search index from its current head. The domain search service implements it.
type Enqueuer ¶
type Enqueuer interface {
Enqueue(ctx context.Context, kind, payload, dedupeKey string) (deduped bool, err error)
}
Enqueuer accepts a background job. kind names the handler that will run it, payload is the job's JSON arguments (empty means an empty object), and dedupeKey, when non-empty, collapses jobs with the same key while one is still queued or running so a burst of triggers does not pile up redundant work. It reports deduped=true when an active job with the same key already existed and this submission was folded into it.
type Handler ¶
Handler runs one job of a given kind. A nil error completes the job; a non-nil error fails it, scheduling a retry until its attempts run out.
func RecomputeMergeabilityHandler ¶
func RecomputeMergeabilityHandler(rec MergeabilityRecomputer) Handler
RecomputeMergeabilityHandler binds the recompute_mergeability kind to the recomputer. A payload missing its issue_pk is a permanent error, since no retry can repair a malformed job.
func RecomputeReviewDecisionHandler ¶
func RecomputeReviewDecisionHandler(rec ReviewDecisionRecomputer) Handler
RecomputeReviewDecisionHandler binds the recompute_review_decision kind to the recomputer. A payload missing its issue_pk is a permanent error, since no retry can repair a malformed job. It reuses recomputePayload, the shared issue_pk job body the mergeability handler also decodes.
func ReindexSearchHandler ¶ added in v0.1.3
func ReindexSearchHandler(rx CodeReindexer) Handler
ReindexSearchHandler binds the reindex_search kind to the reindexer. A payload missing its repo_pk is a permanent error, since no retry can repair a malformed job.
type JobStore ¶
JobStore is the slice of the store the enqueuer writes through: a single queue insert with dedupe handling. The store satisfies it directly.
type MergeabilityRecomputer ¶
type MergeabilityRecomputer interface {
RecomputeMergeability(ctx context.Context, issuePK int64) error
}
MergeabilityRecomputer computes and persists a pull request's merge state for the issue that backs it. The domain pull request service implements it.
type ReviewDecisionRecomputer ¶
type ReviewDecisionRecomputer interface {
RecomputeReviewDecision(ctx context.Context, issuePK int64) error
}
ReviewDecisionRecomputer resolves and caches a pull request's review decision and status check rollup for the issue that backs it. The domain review service implements it.
type RunStore ¶
type RunStore interface {
ClaimJob(ctx context.Context) (*store.JobRow, error)
CompleteJob(ctx context.Context, pk int64) error
FailJob(ctx context.Context, pk int64, attempts, maxAttempts int, reason string, backoffSeconds int) error
}
RunStore is the slice of the store the run loop drives the queue through: the atomic claim, the completion delete, and the failure requeue.
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime is the queue's consumer: a handler registry plus the claim-run-settle loop over the store.
func NewRuntime ¶
NewRuntime builds a Runtime over the store. idle is how long Run waits before polling again when the queue is empty; a zero or negative value uses a one second default.
func (*Runtime) Register ¶
Register binds a handler to a job kind. A second registration for the same kind replaces the first, which keeps wiring order from mattering.
func (*Runtime) Run ¶
Run drains the queue until ctx is canceled, sleeping idle between polls when it finds nothing to do. It returns ctx.Err() on cancellation, the normal way a graceful shutdown ends it. With SetWorkers above one it drives that many claim loops; the store's atomic claim keeps them off each other's jobs.
func (*Runtime) RunOnce ¶
RunOnce claims and runs a single job. It reports worked=false when the queue held nothing runnable, the signal Run uses to switch to its idle sleep. A handler panic or error fails the job with backoff rather than killing the loop; a missing handler fails it permanently, since no retry will ever find one.
func (*Runtime) SetWorkers ¶ added in v0.1.3
SetWorkers sets how many claim loops Run drives. One slow handler (a webhook delivery waiting out a dead endpoint) used to head-of-line-block every other job; with n loops the queue keeps moving around it. Values below one are treated as one. Jobs may run concurrently and so may two deliveries to the same hook, which matches the at-least-once, unordered contract the retry path already imposes on consumers.
type StoreEnqueuer ¶
type StoreEnqueuer struct {
// contains filtered or unexported fields
}
StoreEnqueuer is the store-backed Enqueuer: it persists each job as a row in the jobs table so the work survives a restart and any process running the claim loop can pick it up.
func NewStoreEnqueuer ¶
func NewStoreEnqueuer(st JobStore) *StoreEnqueuer
NewStoreEnqueuer builds a StoreEnqueuer over the job store.