Documentation
¶
Index ¶
- Constants
- Variables
- func TriggerPriority(trigger string) int
- type EnqueueBatchResult
- type ProjectEvent
- type Queue
- func (q *Queue) AcquireCloneLock(ctx context.Context, urlHash, owner string, ttl time.Duration) (bool, error)
- func (q *Queue) AdjustScanCounters(ctx context.Context, scanID, projectName string, deltas ...any) error
- func (q *Queue) AttachStackScanToScan(ctx context.Context, scanID, stackScanID string) error
- func (q *Queue) CancelAndStartScan(ctx context.Context, ...) (*Scan, error)
- func (q *Queue) CancelScan(ctx context.Context, scanID, projectName, reason string) error
- func (q *Queue) CancelStackScan(ctx context.Context, stackScan *StackScan, reason string) error
- func (q *Queue) ClearInflightForScan(ctx context.Context, scanID string)
- func (q *Queue) Client() *redis.Client
- func (q *Queue) Close() error
- func (q *Queue) Complete(ctx context.Context, stackScan *StackScan, drifted bool) error
- func (q *Queue) Dequeue(ctx context.Context, workerID string) (*StackScan, error)
- func (q *Queue) Enqueue(ctx context.Context, stackScan *StackScan) error
- func (q *Queue) EnqueueBatch(ctx context.Context, stacks []*StackScan) (*EnqueueBatchResult, error)
- func (q *Queue) Fail(ctx context.Context, stackScan *StackScan, errMsg string) error
- func (q *Queue) FailScan(ctx context.Context, scanID, projectName, errMsg string) error
- func (q *Queue) GetActiveScan(ctx context.Context, projectName string) (*Scan, error)
- func (q *Queue) GetLastScan(ctx context.Context, projectName string) (*Scan, error)
- func (q *Queue) GetScan(ctx context.Context, scanID string) (*Scan, error)
- func (q *Queue) GetStackScan(ctx context.Context, stackScanID string) (*StackScan, error)
- func (q *Queue) IsProjectLocked(ctx context.Context, projectName string) (bool, error)
- func (q *Queue) ListProjectStackScans(ctx context.Context, projectName string, limit int) ([]*StackScan, error)
- func (q *Queue) MarkScanEnqueueFailed(ctx context.Context, scanID string) error
- func (q *Queue) MarkScanEnqueueSkipped(ctx context.Context, scanID string) error
- func (q *Queue) OldestRunningScanAge(ctx context.Context) (time.Duration, error)
- func (q *Queue) OldestRunningStackScanAge(ctx context.Context) (time.Duration, error)
- func (q *Queue) PublishEvent(ctx context.Context, projectName string, event ProjectEvent) error
- func (q *Queue) PublishScanEvent(ctx context.Context, projectName string, event ScanEvent) error
- func (q *Queue) PublishStackEvent(ctx context.Context, projectName string, event StackEvent) error
- func (q *Queue) QueueDepth(ctx context.Context) (int64, error)
- func (q *Queue) RebuildRunningScansIndex(ctx context.Context) (int, error)
- func (q *Queue) RecoverOrphanedStackScans(ctx context.Context) (int, error)
- func (q *Queue) RecoverStaleScans(ctx context.Context, maxAge time.Duration) (int, error)
- func (q *Queue) RecoverStaleStackScans(ctx context.Context, maxAge time.Duration) (int, error)
- func (q *Queue) ReleaseCloneLock(ctx context.Context, urlHash, owner string) error
- func (q *Queue) ReleaseScanLock(ctx context.Context, projectName, scanID string) error
- func (q *Queue) RenewCloneLock(ctx context.Context, urlHash, owner string, ttl time.Duration) error
- func (q *Queue) RenewScanLock(ctx context.Context, scanID, projectName string, ...)
- func (q *Queue) RunningScanCount(ctx context.Context) (int, error)
- func (q *Queue) RunningStackScanCount(ctx context.Context) (int, error)
- func (q *Queue) SetScanTotal(ctx context.Context, scanID string, total int) error
- func (q *Queue) SetScanVersions(ctx context.Context, scanID, tfVersion, tgVersion string, ...) error
- func (q *Queue) SetScanWorkspace(ctx context.Context, scanID, workspacePath, commitSHA string) error
- func (q *Queue) StartScan(ctx context.Context, projectName, trigger, commit, actor string, total int) (*Scan, error)
- type Scan
- type ScanEvent
- type StackEvent
- type StackScan
Constants ¶
const ( StatusPending = "pending" StatusRunning = "running" StatusCompleted = "completed" StatusFailed = "failed" StatusCanceled = "canceled" )
const ( ScanStatusRunning = "running" ScanStatusCompleted = "completed" ScanStatusFailed = "failed" ScanStatusCanceled = "canceled" )
Variables ¶
var ( ErrProjectLocked = errors.New("repository scan already in progress") ErrStackScanNotFound = errors.New("stack scan not found") ErrStackScanInflight = errors.New("stack scan already inflight") )
var ErrAlreadyClaimed = errors.New("stack scan already claimed")
ErrAlreadyClaimed is returned when another worker has already claimed the stack scan.
var ErrCloneLockNotOwned = errors.New("clone lock not owned by caller")
var ErrScanNotFound = errors.New("scan not found")
Functions ¶
func TriggerPriority ¶
Types ¶
type EnqueueBatchResult ¶
type EnqueueBatchResult struct {
Enqueued []*StackScan // successfully enqueued
Skipped int // skipped because already inflight
Errors []string // per-stack error messages
}
EnqueueBatchResult holds the outcome of a batch enqueue operation.
type ProjectEvent ¶
type ProjectEvent struct {
Type string `json:"type"`
ProjectName string `json:"project"`
ScanID string `json:"scan_id,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
StackPath string `json:"stack_path,omitempty"`
Status string `json:"status,omitempty"`
Drifted *bool `json:"drifted,omitempty"`
Error string `json:"error,omitempty"`
RunAt *time.Time `json:"run_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Completed int `json:"completed,omitempty"`
Failed int `json:"failed,omitempty"`
Total int `json:"total,omitempty"`
DriftedCnt int `json:"drifted_count,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func (*Queue) AcquireCloneLock ¶
func (*Queue) AdjustScanCounters ¶
func (q *Queue) AdjustScanCounters(ctx context.Context, scanID, projectName string, deltas ...any) error
AdjustScanCounters atomically updates scan counters and auto-finishes the scan if all stacks are done. Use this when you know the projectName and want to apply multiple counter deltas in a single call (e.g. batch enqueue skips/failures). Deltas are pairs of (field, delta): "queued", -3, "total", -3
func (*Queue) AttachStackScanToScan ¶
func (*Queue) CancelAndStartScan ¶
func (q *Queue) CancelAndStartScan(ctx context.Context, oldScanID, projectName, cancelReason, trigger, commit, actor string, total int) (*Scan, error)
CancelAndStartScan atomically cancels an existing scan and starts a new one. This prevents the race condition where another caller could acquire the lock between a separate CancelScan + StartScan.
func (*Queue) CancelScan ¶
func (*Queue) CancelStackScan ¶
func (*Queue) ClearInflightForScan ¶
ClearInflightForScan removes inflight markers for all stack scans belonging to a scan.
func (*Queue) Dequeue ¶
Dequeue blocks until a stack scan is available, then returns it. The stack scan status is atomically claimed and updated to "running".
func (*Queue) EnqueueBatch ¶
EnqueueBatch enqueues multiple stack scans using pipelined Redis commands. Inflight checks are batched into a single pipeline, and all enqueue operations are batched into a second pipeline — 2 roundtrips total regardless of stack count.
func (*Queue) GetActiveScan ¶
func (*Queue) GetLastScan ¶
func (*Queue) GetStackScan ¶
func (*Queue) IsProjectLocked ¶
IsProjectLocked checks if a project scan is in progress.
func (*Queue) ListProjectStackScans ¶
func (*Queue) MarkScanEnqueueFailed ¶
func (*Queue) MarkScanEnqueueSkipped ¶
func (*Queue) OldestRunningScanAge ¶
func (*Queue) OldestRunningStackScanAge ¶
func (*Queue) PublishEvent ¶
func (*Queue) PublishScanEvent ¶
func (*Queue) PublishStackEvent ¶
func (*Queue) RebuildRunningScansIndex ¶
RebuildRunningScansIndex scans for scan hashes with status "running" and re-populates the keyRunningScans ZSET. This handles the case where the ZSET was lost (e.g. Redis restart without persistence) but scan hashes survived.
func (*Queue) RecoverOrphanedStackScans ¶
RecoverOrphanedStackScans finds stack scans with status "pending" that are no longer in the queue list (e.g. lost during a crash) and re-queues them. This should be called periodically, not on the dequeue hot path.
func (*Queue) RecoverStaleScans ¶
RecoverStaleScans finds running scans older than maxAge and marks them failed.
func (*Queue) RecoverStaleStackScans ¶
RecoverStaleStackScans finds running stack scans older than maxAge and marks them as failed (or re-queued if retries remain).
func (*Queue) ReleaseCloneLock ¶
func (*Queue) ReleaseScanLock ¶
ReleaseScanLock releases the project lock if still owned by the scan.
func (*Queue) RenewCloneLock ¶
func (*Queue) RenewScanLock ¶
func (*Queue) RunningStackScanCount ¶
func (*Queue) SetScanTotal ¶
func (*Queue) SetScanVersions ¶
func (*Queue) SetScanWorkspace ¶
type Scan ¶
type Scan struct {
ID string `json:"id"`
ProjectName string `json:"project_name"`
Trigger string `json:"trigger,omitempty"`
Commit string `json:"commit,omitempty"`
Actor string `json:"actor,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
EndedAt time.Time `json:"ended_at,omitempty"`
Error string `json:"error,omitempty"`
TerraformVersion string `json:"terraform_version,omitempty"`
TerragruntVersion string `json:"terragrunt_version,omitempty"`
StackTFVersions map[string]string `json:"stack_tf_versions,omitempty"`
StackTGVersions map[string]string `json:"stack_tg_versions,omitempty"`
WorkspacePath string `json:"workspace_path,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
Total int `json:"total"`
Queued int `json:"queued"`
Running int `json:"running"`
Completed int `json:"completed"`
Failed int `json:"failed"`
Drifted int `json:"drifted"`
Errored int `json:"errored"`
}
type ScanEvent ¶
type ScanEvent struct {
ProjectName string
ScanID string
CommitSHA string
Status string
Completed int
Failed int
Total int
DriftedCnt int
StartedAt *time.Time
EndedAt *time.Time
}
func (ScanEvent) ToProjectEvent ¶
func (e ScanEvent) ToProjectEvent() ProjectEvent
type StackEvent ¶
type StackEvent struct {
ProjectName string
ScanID string
StackPath string
Status string
Drifted *bool
Error string
RunAt *time.Time
}
func (StackEvent) ToProjectEvent ¶
func (e StackEvent) ToProjectEvent() ProjectEvent
type StackScan ¶
type StackScan struct {
ID string `json:"id"`
ScanID string `json:"scan_id"`
ProjectName string `json:"project_name"`
ProjectURL string `json:"project_url"`
StackPath string `json:"stack_path"`
Status string `json:"status"`
Retries int `json:"retries"`
MaxRetries int `json:"max_retries"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
Error string `json:"error,omitempty"`
Trigger string `json:"trigger,omitempty"` // "scheduled", "manual", "post-apply"
Commit string `json:"commit,omitempty"`
Actor string `json:"actor,omitempty"`
}