Documentation
¶
Index ¶
- type Job
- type ListFilter
- type MetricsStore
- func (s *MetricsStore) CancelJob(ctx context.Context, id uuid.UUID) error
- func (s *MetricsStore) ClaimJobs(ctx context.Context, n int) ([]*Job, error)
- func (s *MetricsStore) CreateJob(ctx context.Context, j *Job) (*Job, error)
- func (s *MetricsStore) GetJob(ctx context.Context, id uuid.UUID) (*Job, error)
- func (s *MetricsStore) GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)
- func (s *MetricsStore) IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error
- func (s *MetricsStore) ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)
- func (s *MetricsStore) ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)
- func (s *MetricsStore) QueueDepth(ctx context.Context) (int64, error)
- func (s *MetricsStore) RetryDeadJob(ctx context.Context, id uuid.UUID) error
- func (s *MetricsStore) Start(ctx context.Context)
- func (s *MetricsStore) UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error
- type PGStore
- func (s *PGStore) CancelJob(ctx context.Context, id uuid.UUID) error
- func (s *PGStore) ClaimJobs(ctx context.Context, n int) ([]*Job, error)
- func (s *PGStore) CreateJob(ctx context.Context, j *Job) (*Job, error)
- func (s *PGStore) GetJob(ctx context.Context, id uuid.UUID) (*Job, error)
- func (s *PGStore) GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)
- func (s *PGStore) IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error
- func (s *PGStore) ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)
- func (s *PGStore) ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)
- func (s *PGStore) QueueDepth(ctx context.Context) (int64, error)
- func (s *PGStore) RetryDeadJob(ctx context.Context, id uuid.UUID) error
- func (s *PGStore) UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error
- type Status
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
ID uuid.UUID
Name string
Type string
Payload []byte
Status Status
MaxRetries int
RetryCount int
Priority int // 1 (lowest) to 10 (highest), default 5
Error string
IdempotencyKey string // optional; deduplicates non-terminal jobs
ScheduledAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
Job is the in-memory representation of a job row.
type ListFilter ¶
type ListFilter struct {
Status *Status // nil = all statuses
PageSize int
PageToken string // opaque cursor (created_at:id)
}
ListFilter controls ListJobs queries.
type MetricsStore ¶
type MetricsStore struct {
// contains filtered or unexported fields
}
MetricsStore wraps a Store and transparently records Prometheus metrics. Callers interact with it through the Store interface — instrumentation is invisible.
Completion metrics (job duration, terminal status counts) are recorded without an extra DB query by caching (type, claimedAt) when jobs are claimed, then consuming the cache entry when they reach a terminal state. The cache is bounded: entries are removed on first terminal transition, so a job that completes exactly once never accumulates memory.
func NewMetricsStore ¶
func NewMetricsStore(inner Store, m *metrics.Metrics) *MetricsStore
func (*MetricsStore) ClaimJobs ¶
ClaimJobs delegates to inner and caches (type, claimedAt) for each claimed job. The cache is consumed by UpdateStatus when the job reaches a terminal state, eliminating the GetJob() call that was previously needed to look up job type.
func (*MetricsStore) GetJobByIdempotencyKey ¶
func (*MetricsStore) IncrementRetry ¶
func (*MetricsStore) ListDeadJobs ¶
func (*MetricsStore) ListJobs ¶
func (s *MetricsStore) ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)
func (*MetricsStore) QueueDepth ¶
func (s *MetricsStore) QueueDepth(ctx context.Context) (int64, error)
func (*MetricsStore) RetryDeadJob ¶
func (*MetricsStore) Start ¶
func (s *MetricsStore) Start(ctx context.Context)
Start launches the background cache-cleanup goroutine. It removes claimCache entries older than 1 hour, preventing memory leaks when a node restarts mid-job and terminal UpdateStatus calls never arrive. The goroutine runs every 15 minutes and stops when ctx is cancelled.
func (*MetricsStore) UpdateStatus ¶
func (s *MetricsStore) UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error
UpdateStatus intercepts terminal transitions to record completion metrics. Uses the claimCache to look up job type and execution start time without an extra DB round-trip.
type PGStore ¶
type PGStore struct {
// contains filtered or unexported fields
}
PGStore implements Store against PostgreSQL.
func NewPGStore ¶
NewPGStore creates a PGStore using an existing connection pool.
func (*PGStore) GetJobByIdempotencyKey ¶
GetJobByIdempotencyKey returns an existing non-terminal job with the given idempotency key, or nil if none exists.
func (*PGStore) IncrementRetry ¶
func (*PGStore) ListDeadJobs ¶
func (*PGStore) RetryDeadJob ¶
type Store ¶
type Store interface {
// CreateJob inserts a new job and returns it with ID/timestamps populated.
CreateJob(ctx context.Context, j *Job) (*Job, error)
// GetJob retrieves a job by ID.
GetJob(ctx context.Context, id uuid.UUID) (*Job, error)
// GetJobByIdempotencyKey returns an active (non-terminal) job matching the
// given idempotency key, or nil if none exists.
GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)
// ListJobs returns jobs matching the filter.
ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)
// ClaimJobs atomically marks up to n pending/due jobs as running.
// Uses SELECT FOR UPDATE SKIP LOCKED.
ClaimJobs(ctx context.Context, n int) ([]*Job, error)
// UpdateStatus sets status (and optionally error) on a job.
UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error
// IncrementRetry bumps retry_count and sets status back to pending with next scheduled_at.
IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error
// CancelJob transitions a pending job to cancelled.
CancelJob(ctx context.Context, id uuid.UUID) error
// QueueDepth returns the count of pending jobs. Used for Prometheus metrics.
QueueDepth(ctx context.Context) (int64, error)
// RetryDeadJob transitions a dead job back to pending and resets its retry count.
RetryDeadJob(ctx context.Context, id uuid.UUID) error
// ListDeadJobs returns a paginated list of dead jobs.
ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)
}
Store is the persistence interface for jobs.