store

package
v0.0.0-...-eaf5e47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	ID             uuid.UUID
	Name           string
	Type           string
	Payload        []byte
	Status         Status
	MaxRetries     int
	RetryCount     int
	Priority       int // 1 (lowest) to 10 (highest), default 5
	Error          string
	IdempotencyKey string // optional; deduplicates non-terminal jobs
	ScheduledAt    time.Time
	CreatedAt      time.Time
	UpdatedAt      time.Time
}

Job is the in-memory representation of a job row.

type ListFilter

type ListFilter struct {
	Status    *Status // nil = all statuses
	PageSize  int
	PageToken string // opaque cursor (created_at:id)
}

ListFilter controls ListJobs queries.

type MetricsStore

type MetricsStore struct {
	// contains filtered or unexported fields
}

MetricsStore wraps a Store and transparently records Prometheus metrics. Callers interact with it through the Store interface — instrumentation is invisible.

Completion metrics (job duration, terminal status counts) are recorded without an extra DB query by caching (type, claimedAt) when jobs are claimed, then consuming the cache entry when they reach a terminal state. The cache is bounded: entries are removed on first terminal transition, so a job that completes exactly once never accumulates memory.

func NewMetricsStore

func NewMetricsStore(inner Store, m *metrics.Metrics) *MetricsStore

func (*MetricsStore) CancelJob

func (s *MetricsStore) CancelJob(ctx context.Context, id uuid.UUID) error

func (*MetricsStore) ClaimJobs

func (s *MetricsStore) ClaimJobs(ctx context.Context, n int) ([]*Job, error)

ClaimJobs delegates to inner and caches (type, claimedAt) for each claimed job. The cache is consumed by UpdateStatus when the job reaches a terminal state, eliminating the GetJob() call that was previously needed to look up job type.

func (*MetricsStore) CreateJob

func (s *MetricsStore) CreateJob(ctx context.Context, j *Job) (*Job, error)

func (*MetricsStore) GetJob

func (s *MetricsStore) GetJob(ctx context.Context, id uuid.UUID) (*Job, error)

func (*MetricsStore) GetJobByIdempotencyKey

func (s *MetricsStore) GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)

func (*MetricsStore) IncrementRetry

func (s *MetricsStore) IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error

func (*MetricsStore) ListDeadJobs

func (s *MetricsStore) ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)

func (*MetricsStore) ListJobs

func (s *MetricsStore) ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)

func (*MetricsStore) QueueDepth

func (s *MetricsStore) QueueDepth(ctx context.Context) (int64, error)

func (*MetricsStore) RetryDeadJob

func (s *MetricsStore) RetryDeadJob(ctx context.Context, id uuid.UUID) error

func (*MetricsStore) Start

func (s *MetricsStore) Start(ctx context.Context)

Start launches the background cache-cleanup goroutine. It removes claimCache entries older than 1 hour, preventing memory leaks when a node restarts mid-job and terminal UpdateStatus calls never arrive. The goroutine runs every 15 minutes and stops when ctx is cancelled.

func (*MetricsStore) UpdateStatus

func (s *MetricsStore) UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error

UpdateStatus intercepts terminal transitions to record completion metrics. Uses the claimCache to look up job type and execution start time without an extra DB round-trip.

type PGStore

type PGStore struct {
	// contains filtered or unexported fields
}

PGStore implements Store against PostgreSQL.

func NewPGStore

func NewPGStore(pool *pgxpool.Pool) *PGStore

NewPGStore creates a PGStore using an existing connection pool.

func (*PGStore) CancelJob

func (s *PGStore) CancelJob(ctx context.Context, id uuid.UUID) error

func (*PGStore) ClaimJobs

func (s *PGStore) ClaimJobs(ctx context.Context, n int) ([]*Job, error)

func (*PGStore) CreateJob

func (s *PGStore) CreateJob(ctx context.Context, j *Job) (*Job, error)

func (*PGStore) GetJob

func (s *PGStore) GetJob(ctx context.Context, id uuid.UUID) (*Job, error)

func (*PGStore) GetJobByIdempotencyKey

func (s *PGStore) GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)

GetJobByIdempotencyKey returns an existing non-terminal job with the given idempotency key, or nil if none exists.

func (*PGStore) IncrementRetry

func (s *PGStore) IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error

func (*PGStore) ListDeadJobs

func (s *PGStore) ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)

func (*PGStore) ListJobs

func (s *PGStore) ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)

func (*PGStore) QueueDepth

func (s *PGStore) QueueDepth(ctx context.Context) (int64, error)

func (*PGStore) RetryDeadJob

func (s *PGStore) RetryDeadJob(ctx context.Context, id uuid.UUID) error

func (*PGStore) UpdateStatus

func (s *PGStore) UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error

type Status

type Status string

Status mirrors the DB enum.

const (
	StatusPending   Status = "pending"
	StatusRunning   Status = "running"
	StatusCompleted Status = "completed"
	StatusFailed    Status = "failed"
	StatusCancelled Status = "cancelled"
	StatusDead      Status = "dead"
)

type Store

type Store interface {
	// CreateJob inserts a new job and returns it with ID/timestamps populated.
	CreateJob(ctx context.Context, j *Job) (*Job, error)

	// GetJob retrieves a job by ID.
	GetJob(ctx context.Context, id uuid.UUID) (*Job, error)

	// GetJobByIdempotencyKey returns an active (non-terminal) job matching the
	// given idempotency key, or nil if none exists.
	GetJobByIdempotencyKey(ctx context.Context, key string) (*Job, error)

	// ListJobs returns jobs matching the filter.
	ListJobs(ctx context.Context, f ListFilter) ([]*Job, string, error)

	// ClaimJobs atomically marks up to n pending/due jobs as running.
	// Uses SELECT FOR UPDATE SKIP LOCKED.
	ClaimJobs(ctx context.Context, n int) ([]*Job, error)

	// UpdateStatus sets status (and optionally error) on a job.
	UpdateStatus(ctx context.Context, id uuid.UUID, status Status, errMsg string) error

	// IncrementRetry bumps retry_count and sets status back to pending with next scheduled_at.
	IncrementRetry(ctx context.Context, id uuid.UUID, nextRun time.Time) error

	// CancelJob transitions a pending job to cancelled.
	CancelJob(ctx context.Context, id uuid.UUID) error

	// QueueDepth returns the count of pending jobs. Used for Prometheus metrics.
	QueueDepth(ctx context.Context) (int64, error)

	// RetryDeadJob transitions a dead job back to pending and resets its retry count.
	RetryDeadJob(ctx context.Context, id uuid.UUID) error

	// ListDeadJobs returns a paginated list of dead jobs.
	ListDeadJobs(ctx context.Context, pageSize int, pageToken string) ([]*Job, string, error)
}

Store is the persistence interface for jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL