queue

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusPending   = "pending"
	StatusRunning   = "running"
	StatusCompleted = "completed"
	StatusFailed    = "failed"
	StatusCanceled  = "canceled"
)
View Source
const (
	ScanStatusRunning   = "running"
	ScanStatusCompleted = "completed"
	ScanStatusFailed    = "failed"
	ScanStatusCanceled  = "canceled"
)

Variables

View Source
var (
	ErrProjectLocked     = errors.New("repository scan already in progress")
	ErrStackScanNotFound = errors.New("stack scan not found")
	ErrStackScanInflight = errors.New("stack scan already inflight")
)
View Source
var ErrAlreadyClaimed = errors.New("stack scan already claimed")

ErrAlreadyClaimed is returned when another worker has already claimed the stack scan.

View Source
var ErrCloneLockNotOwned = errors.New("clone lock not owned by caller")
View Source
var ErrScanNotFound = errors.New("scan not found")

Functions

func TriggerPriority

func TriggerPriority(trigger string) int

Types

type EnqueueBatchResult

type EnqueueBatchResult struct {
	Enqueued []*StackScan // successfully enqueued
	Skipped  int          // skipped because already inflight
	Errors   []string     // per-stack error messages
}

EnqueueBatchResult holds the outcome of a batch enqueue operation.

type ProjectEvent

type ProjectEvent struct {
	Type        string     `json:"type"`
	ProjectName string     `json:"project"`
	ScanID      string     `json:"scan_id,omitempty"`
	CommitSHA   string     `json:"commit_sha,omitempty"`
	StackPath   string     `json:"stack_path,omitempty"`
	Status      string     `json:"status,omitempty"`
	Drifted     *bool      `json:"drifted,omitempty"`
	Error       string     `json:"error,omitempty"`
	RunAt       *time.Time `json:"run_at,omitempty"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	EndedAt     *time.Time `json:"ended_at,omitempty"`
	Completed   int        `json:"completed,omitempty"`
	Failed      int        `json:"failed,omitempty"`
	Total       int        `json:"total,omitempty"`
	DriftedCnt  int        `json:"drifted_count,omitempty"`
	Timestamp   time.Time  `json:"timestamp"`
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(addr, password string, db int, lockTTL time.Duration) (*Queue, error)

func (*Queue) AcquireCloneLock

func (q *Queue) AcquireCloneLock(ctx context.Context, urlHash, owner string, ttl time.Duration) (bool, error)

func (*Queue) AdjustScanCounters

func (q *Queue) AdjustScanCounters(ctx context.Context, scanID, projectName string, deltas ...any) error

AdjustScanCounters atomically updates scan counters and auto-finishes the scan if all stacks are done. Use this when you know the projectName and want to apply multiple counter deltas in a single call (e.g. batch enqueue skips/failures). Deltas are pairs of (field, delta): "queued", -3, "total", -3

func (*Queue) AttachStackScanToScan

func (q *Queue) AttachStackScanToScan(ctx context.Context, scanID, stackScanID string) error

func (*Queue) CancelAndStartScan

func (q *Queue) CancelAndStartScan(ctx context.Context, oldScanID, projectName, cancelReason, trigger, commit, actor string, total int) (*Scan, error)

CancelAndStartScan atomically cancels an existing scan and starts a new one. This prevents the race condition where another caller could acquire the lock between a separate CancelScan + StartScan.

func (*Queue) CancelScan

func (q *Queue) CancelScan(ctx context.Context, scanID, projectName, reason string) error

func (*Queue) CancelStackScan

func (q *Queue) CancelStackScan(ctx context.Context, stackScan *StackScan, reason string) error

func (*Queue) ClearInflightForScan

func (q *Queue) ClearInflightForScan(ctx context.Context, scanID string)

ClearInflightForScan removes inflight markers for all stack scans belonging to a scan.

func (*Queue) Client

func (q *Queue) Client() *redis.Client

Client returns the underlying Redis client for health checks.

func (*Queue) Close

func (q *Queue) Close() error

func (*Queue) Complete

func (q *Queue) Complete(ctx context.Context, stackScan *StackScan, drifted bool) error

Complete marks a stack scan as completed and releases the project lock.

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context, workerID string) (*StackScan, error)

Dequeue blocks until a stack scan is available, then returns it. The stack scan status is atomically claimed and updated to "running".

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, stackScan *StackScan) error

Enqueue adds a stack scan to the queue.

func (*Queue) EnqueueBatch

func (q *Queue) EnqueueBatch(ctx context.Context, stacks []*StackScan) (*EnqueueBatchResult, error)

EnqueueBatch enqueues multiple stack scans using pipelined Redis commands. Inflight checks are batched into a single pipeline, and all enqueue operations are batched into a second pipeline — 2 roundtrips total regardless of stack count.

func (*Queue) Fail

func (q *Queue) Fail(ctx context.Context, stackScan *StackScan, errMsg string) error

Fail marks a stack scan as failed. If retries remain, re-queues it.

func (*Queue) FailScan

func (q *Queue) FailScan(ctx context.Context, scanID, projectName, errMsg string) error

func (*Queue) GetActiveScan

func (q *Queue) GetActiveScan(ctx context.Context, projectName string) (*Scan, error)

func (*Queue) GetLastScan

func (q *Queue) GetLastScan(ctx context.Context, projectName string) (*Scan, error)

func (*Queue) GetScan

func (q *Queue) GetScan(ctx context.Context, scanID string) (*Scan, error)

func (*Queue) GetStackScan

func (q *Queue) GetStackScan(ctx context.Context, stackScanID string) (*StackScan, error)

func (*Queue) IsProjectLocked

func (q *Queue) IsProjectLocked(ctx context.Context, projectName string) (bool, error)

IsProjectLocked checks if a project scan is in progress.

func (*Queue) ListProjectStackScans

func (q *Queue) ListProjectStackScans(ctx context.Context, projectName string, limit int) ([]*StackScan, error)

func (*Queue) MarkScanEnqueueFailed

func (q *Queue) MarkScanEnqueueFailed(ctx context.Context, scanID string) error

func (*Queue) MarkScanEnqueueSkipped

func (q *Queue) MarkScanEnqueueSkipped(ctx context.Context, scanID string) error

func (*Queue) OldestRunningScanAge

func (q *Queue) OldestRunningScanAge(ctx context.Context) (time.Duration, error)

func (*Queue) OldestRunningStackScanAge

func (q *Queue) OldestRunningStackScanAge(ctx context.Context) (time.Duration, error)

func (*Queue) PublishEvent

func (q *Queue) PublishEvent(ctx context.Context, projectName string, event ProjectEvent) error

func (*Queue) PublishScanEvent

func (q *Queue) PublishScanEvent(ctx context.Context, projectName string, event ScanEvent) error

func (*Queue) PublishStackEvent

func (q *Queue) PublishStackEvent(ctx context.Context, projectName string, event StackEvent) error

func (*Queue) QueueDepth

func (q *Queue) QueueDepth(ctx context.Context) (int64, error)

func (*Queue) RebuildRunningScansIndex

func (q *Queue) RebuildRunningScansIndex(ctx context.Context) (int, error)

RebuildRunningScansIndex scans for scan hashes with status "running" and re-populates the keyRunningScans ZSET. This handles the case where the ZSET was lost (e.g. Redis restart without persistence) but scan hashes survived.

func (*Queue) RecoverOrphanedStackScans

func (q *Queue) RecoverOrphanedStackScans(ctx context.Context) (int, error)

RecoverOrphanedStackScans finds stack scans with status "pending" that are no longer in the queue list (e.g. lost during a crash) and re-queues them. This should be called periodically, not on the dequeue hot path.

func (*Queue) RecoverStaleScans

func (q *Queue) RecoverStaleScans(ctx context.Context, maxAge time.Duration) (int, error)

RecoverStaleScans finds running scans older than maxAge and marks them failed.

func (*Queue) RecoverStaleStackScans

func (q *Queue) RecoverStaleStackScans(ctx context.Context, maxAge time.Duration) (int, error)

RecoverStaleStackScans finds running stack scans older than maxAge and marks them as failed (or re-queued if retries remain).

func (*Queue) ReleaseCloneLock

func (q *Queue) ReleaseCloneLock(ctx context.Context, urlHash, owner string) error

func (*Queue) ReleaseScanLock

func (q *Queue) ReleaseScanLock(ctx context.Context, projectName, scanID string) error

ReleaseScanLock releases the project lock if still owned by the scan.

func (*Queue) RenewCloneLock

func (q *Queue) RenewCloneLock(ctx context.Context, urlHash, owner string, ttl time.Duration) error

func (*Queue) RenewScanLock

func (q *Queue) RenewScanLock(ctx context.Context, scanID, projectName string, maxAge, renewEvery time.Duration)

func (*Queue) RunningScanCount

func (q *Queue) RunningScanCount(ctx context.Context) (int, error)

func (*Queue) RunningStackScanCount

func (q *Queue) RunningStackScanCount(ctx context.Context) (int, error)

func (*Queue) SetScanTotal

func (q *Queue) SetScanTotal(ctx context.Context, scanID string, total int) error

func (*Queue) SetScanVersions

func (q *Queue) SetScanVersions(ctx context.Context, scanID, tfVersion, tgVersion string, stackTF, stackTG map[string]string) error

func (*Queue) SetScanWorkspace

func (q *Queue) SetScanWorkspace(ctx context.Context, scanID, workspacePath, commitSHA string) error

func (*Queue) StartScan

func (q *Queue) StartScan(ctx context.Context, projectName, trigger, commit, actor string, total int) (*Scan, error)

type Scan

type Scan struct {
	ID          string    `json:"id"`
	ProjectName string    `json:"project_name"`
	Trigger     string    `json:"trigger,omitempty"`
	Commit      string    `json:"commit,omitempty"`
	Actor       string    `json:"actor,omitempty"`
	Status      string    `json:"status"`
	CreatedAt   time.Time `json:"created_at"`
	StartedAt   time.Time `json:"started_at"`
	EndedAt     time.Time `json:"ended_at,omitempty"`
	Error       string    `json:"error,omitempty"`

	TerraformVersion  string            `json:"terraform_version,omitempty"`
	TerragruntVersion string            `json:"terragrunt_version,omitempty"`
	StackTFVersions   map[string]string `json:"stack_tf_versions,omitempty"`
	StackTGVersions   map[string]string `json:"stack_tg_versions,omitempty"`
	WorkspacePath     string            `json:"workspace_path,omitempty"`
	CommitSHA         string            `json:"commit_sha,omitempty"`

	Total     int `json:"total"`
	Queued    int `json:"queued"`
	Running   int `json:"running"`
	Completed int `json:"completed"`
	Failed    int `json:"failed"`
	Drifted   int `json:"drifted"`
	Errored   int `json:"errored"`
}

type ScanEvent

type ScanEvent struct {
	ProjectName string
	ScanID      string
	CommitSHA   string
	Status      string
	Completed   int
	Failed      int
	Total       int
	DriftedCnt  int
	StartedAt   *time.Time
	EndedAt     *time.Time
}

func (ScanEvent) ToProjectEvent

func (e ScanEvent) ToProjectEvent() ProjectEvent

type StackEvent

type StackEvent struct {
	ProjectName string
	ScanID      string
	StackPath   string
	Status      string
	Drifted     *bool
	Error       string
	RunAt       *time.Time
}

func (StackEvent) ToProjectEvent

func (e StackEvent) ToProjectEvent() ProjectEvent

type StackScan

type StackScan struct {
	ID          string    `json:"id"`
	ScanID      string    `json:"scan_id"`
	ProjectName string    `json:"project_name"`
	ProjectURL  string    `json:"project_url"`
	StackPath   string    `json:"stack_path"`
	Status      string    `json:"status"`
	Retries     int       `json:"retries"`
	MaxRetries  int       `json:"max_retries"`
	CreatedAt   time.Time `json:"created_at"`
	StartedAt   time.Time `json:"started_at,omitempty"`
	CompletedAt time.Time `json:"completed_at,omitempty"`
	WorkerID    string    `json:"worker_id,omitempty"`
	Error       string    `json:"error,omitempty"`

	Trigger string `json:"trigger,omitempty"` // "scheduled", "manual", "post-apply"
	Commit  string `json:"commit,omitempty"`
	Actor   string `json:"actor,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL