worker

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package worker is the background job queue: the enqueue API job producers submit through, and (from a later milestone) the claim-and-run loop that drains it. M3 introduces only the enqueue seam, which the post-receive push sink uses to record push events and search reindexes; the run loop and the per-kind handlers land with the milestones that consume each job kind.

Index

Constants

View Source
const DeliveryRetention = 30 * 24 * time.Hour

DeliveryRetention is how long a webhook delivery record is kept. Thirty days matches GitHub's own delivery log window; a record carries the full request and response bodies, so keeping them forever grows the table without bound.

Variables

This section is empty.

Functions

func RunDeliveryRetention added in v0.1.3

func RunDeliveryRetention(ctx context.Context, st deliveryPruner, log *slog.Logger)

RunDeliveryRetention deletes webhook deliveries older than DeliveryRetention, sweeping once immediately and then every retentionSweepInterval until ctx is canceled. The server starts it as a background goroutine alongside the job runtime.

Types

type CodeReindexer added in v0.1.3

type CodeReindexer interface {
	ReindexRepoCode(ctx context.Context, repoPK int64) error
}

CodeReindexer rebuilds a repository's code search index from its current head. The domain search service implements it.

type Enqueuer

type Enqueuer interface {
	Enqueue(ctx context.Context, kind, payload, dedupeKey string) (deduped bool, err error)
}

Enqueuer accepts a background job. kind names the handler that will run it, payload is the job's JSON arguments (empty means an empty object), and dedupeKey, when non-empty, collapses jobs with the same key while one is still queued or running so a burst of triggers does not pile up redundant work. It reports deduped=true when an active job with the same key already existed and this submission was folded into it.

type Handler

type Handler func(ctx context.Context, job store.JobRow) error

Handler runs one job of a given kind. A nil error completes the job; a non-nil error fails it, scheduling a retry until its attempts run out.

func RecomputeMergeabilityHandler

func RecomputeMergeabilityHandler(rec MergeabilityRecomputer) Handler

RecomputeMergeabilityHandler binds the recompute_mergeability kind to the recomputer. A payload missing its issue_pk is a permanent error, since no retry can repair a malformed job.

func RecomputeReviewDecisionHandler

func RecomputeReviewDecisionHandler(rec ReviewDecisionRecomputer) Handler

RecomputeReviewDecisionHandler binds the recompute_review_decision kind to the recomputer. A payload missing its issue_pk is a permanent error, since no retry can repair a malformed job. It reuses recomputePayload, the shared issue_pk job body the mergeability handler also decodes.

func ReindexSearchHandler added in v0.1.3

func ReindexSearchHandler(rx CodeReindexer) Handler

ReindexSearchHandler binds the reindex_search kind to the reindexer. A payload missing its repo_pk is a permanent error, since no retry can repair a malformed job.

type JobStore

type JobStore interface {
	EnqueueJob(ctx context.Context, j *store.JobRow) (bool, error)
}

JobStore is the slice of the store the enqueuer writes through: a single queue insert with dedupe handling. The store satisfies it directly.

type MergeabilityRecomputer

type MergeabilityRecomputer interface {
	RecomputeMergeability(ctx context.Context, issuePK int64) error
}

MergeabilityRecomputer computes and persists a pull request's merge state for the issue that backs it. The domain pull request service implements it.

type ReviewDecisionRecomputer

type ReviewDecisionRecomputer interface {
	RecomputeReviewDecision(ctx context.Context, issuePK int64) error
}

ReviewDecisionRecomputer resolves and caches a pull request's review decision and status check rollup for the issue that backs it. The domain review service implements it.

type RunStore

type RunStore interface {
	ClaimJob(ctx context.Context) (*store.JobRow, error)
	CompleteJob(ctx context.Context, pk int64) error
	FailJob(ctx context.Context, pk int64, attempts, maxAttempts int, reason string, backoffSeconds int) error
}

RunStore is the slice of the store the run loop drives the queue through: the atomic claim, the completion delete, and the failure requeue.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime is the queue's consumer: a handler registry plus the claim-run-settle loop over the store.

func NewRuntime

func NewRuntime(st RunStore, log *slog.Logger, idle time.Duration) *Runtime

NewRuntime builds a Runtime over the store. idle is how long Run waits before polling again when the queue is empty; a zero or negative value uses a one second default.

func (*Runtime) Register

func (r *Runtime) Register(kind string, h Handler)

Register binds a handler to a job kind. A second registration for the same kind replaces the first, which keeps wiring order from mattering.

func (*Runtime) Run

func (r *Runtime) Run(ctx context.Context) error

Run drains the queue until ctx is canceled, sleeping idle between polls when it finds nothing to do. It returns ctx.Err() on cancellation, the normal way a graceful shutdown ends it. With SetWorkers above one it drives that many claim loops; the store's atomic claim keeps them off each other's jobs.

func (*Runtime) RunOnce

func (r *Runtime) RunOnce(ctx context.Context) (worked bool, err error)

RunOnce claims and runs a single job. It reports worked=false when the queue held nothing runnable, the signal Run uses to switch to its idle sleep. A handler panic or error fails the job with backoff rather than killing the loop; a missing handler fails it permanently, since no retry will ever find one.

func (*Runtime) SetWorkers added in v0.1.3

func (r *Runtime) SetWorkers(n int)

SetWorkers sets how many claim loops Run drives. One slow handler (a webhook delivery waiting out a dead endpoint) used to head-of-line-block every other job; with n loops the queue keeps moving around it. Values below one are treated as one. Jobs may run concurrently and so may two deliveries to the same hook, which matches the at-least-once, unordered contract the retry path already imposes on consumers.

type StoreEnqueuer

type StoreEnqueuer struct {
	// contains filtered or unexported fields
}

StoreEnqueuer is the store-backed Enqueuer: it persists each job as a row in the jobs table so the work survives a restart and any process running the claim loop can pick it up.

func NewStoreEnqueuer

func NewStoreEnqueuer(st JobStore) *StoreEnqueuer

NewStoreEnqueuer builds a StoreEnqueuer over the job store.

func (*StoreEnqueuer) Enqueue

func (e *StoreEnqueuer) Enqueue(ctx context.Context, kind, payload, dedupeKey string) (bool, error)

Enqueue inserts the job into the queue, folding it into an active job with the same dedupe key when one exists.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL