jobs

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPermanent indicates the job should not be retried
	ErrPermanent = errors.New("permanent error")
	// ErrRetryable indicates the job can be retried
	ErrRetryable = errors.New("retryable error")
)

Error types for retry handling

View Source
var ErrCircuitOpen = errors.New("circuit breaker is open")

ErrCircuitOpen is returned when the circuit breaker is open.

Functions

func IsPermanent

func IsPermanent(err error) bool

IsPermanent checks if an error is marked as permanent (should not retry).

func LoadAWSConfig

func LoadAWSConfig(ctx context.Context, region string) (aws.Config, error)

func PermanentError

func PermanentError(err error) error

PermanentError wraps an error to indicate it should not be retried.

func RetryableError

func RetryableError(err error) error

RetryableError wraps an error to indicate it can be retried.

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for handling failures.

func NewCircuitBreaker

func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker.

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() bool

Allow checks if a request should be allowed through. Returns true if the request can proceed, false if circuit is open.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitState

State returns the current state of the circuit breaker.

func (*CircuitBreaker) Stats

func (cb *CircuitBreaker) Stats() (state CircuitState, failures int, successes int)

Stats returns current circuit breaker statistics.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	FailureThreshold int           // Default: 5
	SuccessThreshold int           // Default: 2
	Timeout          time.Duration // Default: 30s
}

CircuitBreakerConfig configures the circuit breaker.

type CircuitState

type CircuitState int

CircuitState represents the state of the circuit breaker.

const (
	CircuitClosed   CircuitState = iota // Normal operation
	CircuitOpen                         // Failing, rejecting requests
	CircuitHalfOpen                     // Testing if service recovered
)

func (CircuitState) String

func (s CircuitState) String() string

type DLQConsumer

type DLQConsumer struct {
	// contains filtered or unexported fields
}

DLQConsumer processes messages from the dead letter queue.

func NewDLQConsumer

func NewDLQConsumer(dlqQueue, mainQueue Queue, store Store, logger *slog.Logger, config DLQConsumerConfig) *DLQConsumer

NewDLQConsumer creates a new DLQ consumer.

func (*DLQConsumer) ReplayMessage

func (c *DLQConsumer) ReplayMessage(ctx context.Context, msg QueueMessage) error

ReplayMessage re-enqueues a dead letter message to the main queue.

func (*DLQConsumer) Start

func (c *DLQConsumer) Start(ctx context.Context)

Start begins consuming from the DLQ.

type DLQConsumerConfig

type DLQConsumerConfig struct {
	PollWait          time.Duration
	VisibilityTimeout time.Duration
	HeartbeatInterval time.Duration
	// OnDeadLetter is called for each dead letter. Can be used for alerting.
	// If nil, messages are logged and deleted.
	OnDeadLetter func(ctx context.Context, msg QueueMessage, job *Job) error
}

DLQConsumerConfig configures the DLQ consumer.

type DynamoIdempotencyStore

type DynamoIdempotencyStore struct {
	// contains filtered or unexported fields
}

DynamoIdempotencyStore implements IdempotencyStore using DynamoDB.

func NewDynamoIdempotencyStore

func NewDynamoIdempotencyStore(cfg aws.Config, table string) *DynamoIdempotencyStore

NewDynamoIdempotencyStore creates a new DynamoDB-backed idempotency store. The table should have: - Partition key: message_id (String) - TTL attribute: expires_at

func (*DynamoIdempotencyStore) IsProcessed

func (s *DynamoIdempotencyStore) IsProcessed(ctx context.Context, messageID string) (bool, error)

func (*DynamoIdempotencyStore) MarkCompleted

func (s *DynamoIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error

func (*DynamoIdempotencyStore) MarkFailed

func (s *DynamoIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error

func (*DynamoIdempotencyStore) MarkProcessing

func (s *DynamoIdempotencyStore) MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)

type DynamoStore

type DynamoStore struct {
	// contains filtered or unexported fields
}

func NewDynamoStore

func NewDynamoStore(cfg aws.Config, table string) *DynamoStore

func (*DynamoStore) ClaimJob

func (s *DynamoStore) ClaimJob(ctx context.Context, jobID, workerID string, lease time.Duration) (*Job, bool, error)

func (*DynamoStore) CompleteJob

func (s *DynamoStore) CompleteJob(ctx context.Context, jobID, result string) error

func (*DynamoStore) CreateJob

func (s *DynamoStore) CreateJob(ctx context.Context, job *Job) error

func (*DynamoStore) ExtendLease

func (s *DynamoStore) ExtendLease(ctx context.Context, jobID, workerID string, lease time.Duration) error

func (*DynamoStore) FailJob

func (s *DynamoStore) FailJob(ctx context.Context, jobID, message string) error

func (*DynamoStore) GetJob

func (s *DynamoStore) GetJob(ctx context.Context, jobID string) (*Job, error)

func (*DynamoStore) RetryJob

func (s *DynamoStore) RetryJob(ctx context.Context, jobID, message string) error

type EnqueueOptions

type EnqueueOptions struct {
	GroupID      string
	MaxAttempts  int
	Overrides    InspectOverrides
	RepoURL      string
	Truncated    bool
	FilesScanned int
}

type HealthServer

type HealthServer struct {
	// contains filtered or unexported fields
}

HealthServer provides HTTP health check endpoints for the worker.

func NewHealthServer

func NewHealthServer(worker *Worker, addr string, logger *slog.Logger) *HealthServer

NewHealthServer creates a new health server.

func (*HealthServer) Shutdown

func (hs *HealthServer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the health server.

func (*HealthServer) Start

func (hs *HealthServer) Start() error

Start starts the health server in a goroutine.

type HealthStatus

type HealthStatus struct {
	Healthy      bool   `json:"healthy"`
	WorkerID     string `json:"worker_id"`
	ShuttingDown bool   `json:"shutting_down"`
	CircuitState string `json:"circuit_state"`
	PanicCount   int64  `json:"panic_count"`
	LastActivity int64  `json:"last_activity_unix"`
}

HealthStatus represents the worker's health state.

type IdempotencyRecord

type IdempotencyRecord struct {
	MessageID   string `dynamodbav:"message_id"`
	Status      string `dynamodbav:"status"` // "processing", "completed"
	WorkerID    string `dynamodbav:"worker_id"`
	ProcessedAt int64  `dynamodbav:"processed_at"`
	ExpiresAt   int64  `dynamodbav:"expires_at"` // TTL attribute
}

IdempotencyRecord represents a processed message record.

type IdempotencyStore

type IdempotencyStore interface {
	// MarkProcessing attempts to mark a message as being processed.
	// Returns true if this is the first time seeing this message.
	// Returns false if the message was already processed or is being processed.
	MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)

	// MarkCompleted marks a message as successfully processed.
	MarkCompleted(ctx context.Context, messageID string) error

	// MarkFailed removes the processing lock so the message can be retried.
	MarkFailed(ctx context.Context, messageID string) error

	// IsProcessed checks if a message was already successfully processed.
	IsProcessed(ctx context.Context, messageID string) (bool, error)
}

IdempotencyStore tracks processed message IDs to prevent duplicate processing. Uses DynamoDB with TTL for automatic cleanup.

type InspectOverrides

type InspectOverrides struct {
	AWSRegion  string `json:"aws_region,omitempty"`
	AWSAccount string `json:"aws_account,omitempty"`
	GCPProject string `json:"gcp_project,omitempty"`
	GCPZone    string `json:"gcp_zone,omitempty"`
	Cluster    string `json:"cluster,omitempty"`
}

type InspectResourcePayload

type InspectResourcePayload struct {
	Resource  ResourceRef      `json:"resource"`
	Overrides InspectOverrides `json:"overrides,omitempty"`
}

type Job

type Job struct {
	ID             string  `json:"id" dynamodbav:"job_id"`
	Type           JobType `json:"type" dynamodbav:"type"`
	Status         Status  `json:"status" dynamodbav:"status"`
	Payload        string  `json:"payload" dynamodbav:"payload"`
	Result         string  `json:"result,omitempty" dynamodbav:"result,omitempty"`
	Error          string  `json:"error,omitempty" dynamodbav:"error,omitempty"`
	Attempt        int     `json:"attempt" dynamodbav:"attempt"`
	MaxAttempts    int     `json:"max_attempts" dynamodbav:"max_attempts"`
	GroupID        string  `json:"group_id,omitempty" dynamodbav:"group_id,omitempty"`
	WorkerID       string  `json:"worker_id,omitempty" dynamodbav:"worker_id,omitempty"`
	LeaseExpiresAt int64   `json:"lease_expires_at,omitempty" dynamodbav:"lease_expires_at,omitempty"`
	CreatedAt      int64   `json:"created_at" dynamodbav:"created_at"`
	UpdatedAt      int64   `json:"updated_at" dynamodbav:"updated_at"`

	// Tracing fields
	CorrelationID string `json:"correlation_id,omitempty" dynamodbav:"correlation_id,omitempty"`
	ParentID      string `json:"parent_id,omitempty" dynamodbav:"parent_id,omitempty"`
}

type JobBatch

type JobBatch struct {
	GroupID      string    `json:"group_id"`
	JobIDs       []string  `json:"job_ids"`
	QueuedAt     time.Time `json:"queued_at"`
	TotalJobs    int       `json:"total_jobs"`
	MaxAttempts  int       `json:"max_attempts"`
	RepoURL      string    `json:"repo_url,omitempty"`
	FilesScanned int       `json:"files_scanned,omitempty"`
	Truncated    bool      `json:"truncated"`
}

type JobHandler

type JobHandler func(ctx context.Context, payload string) (string, error)

JobHandler processes a job and returns the result.

func NewInspectResourceHandler

func NewInspectResourceHandler(tools *agents.SecurityTools) JobHandler

NewInspectResourceHandler creates a handler for InspectResource jobs.

type JobMessage

type JobMessage struct {
	JobID         string `json:"job_id"`
	GroupID       string `json:"group_id,omitempty"`
	CorrelationID string `json:"correlation_id,omitempty"`
	// Attempt number for retry tracking (used in deduplication ID generation)
	Attempt int `json:"attempt,omitempty"`
	// DeduplicationID for FIFO queues - if empty, generates unique ID from job_id:attempt:timestamp
	DeduplicationID string `json:"deduplication_id,omitempty"`
}

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry manages job type handlers.

func NewJobRegistry

func NewJobRegistry() *JobRegistry

NewJobRegistry creates a new job registry.

func (*JobRegistry) Execute

func (r *JobRegistry) Execute(ctx context.Context, job *Job) (string, error)

Execute executes a job using the registered handler.

func (*JobRegistry) Get

func (r *JobRegistry) Get(jobType JobType) (JobHandler, bool)

Get returns the handler for a job type.

func (*JobRegistry) Register

func (r *JobRegistry) Register(jobType JobType, handler JobHandler)

Register registers a handler for a job type.

func (*JobRegistry) RegisteredTypes

func (r *JobRegistry) RegisteredTypes() []JobType

RegisteredTypes returns all registered job types.

type JobType

type JobType string
const (
	JobTypeInspectResource JobType = "inspect_resource"
	JobTypeNativeSync      JobType = "native_sync"
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(queue Queue, store Store, logger *slog.Logger) *Manager

func (*Manager) EnqueueInspectResources

func (m *Manager) EnqueueInspectResources(ctx context.Context, resources []ResourceRef, opts EnqueueOptions) (*JobBatch, error)

func (*Manager) EnqueueNativeSync

func (m *Manager) EnqueueNativeSync(ctx context.Context, payload NativeSyncPayload, opts EnqueueOptions) (*Job, error)

func (*Manager) WaitForJobs

func (m *Manager) WaitForJobs(ctx context.Context, jobIDs []string, pollInterval time.Duration) ([]*Job, error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics collects worker metrics and logs them periodically.

func NewMetrics

func NewMetrics(logger *slog.Logger, config MetricsConfig) *Metrics

NewMetrics creates a new metrics collector. Note: Currently logs metrics. CloudWatch support can be added by vendoring github.com/aws/aws-sdk-go-v2/service/cloudwatch

func (*Metrics) Flush

func (m *Metrics) Flush(ctx context.Context) error

Flush logs accumulated metrics and resets counters.

func (*Metrics) RecordHeartbeat

func (m *Metrics) RecordHeartbeat(succeeded bool)

RecordHeartbeat records a heartbeat attempt.

func (*Metrics) RecordJobProcessed

func (m *Metrics) RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)

RecordJobProcessed records a job completion with its duration and status.

func (*Metrics) RecordMessagesDeleteFailed

func (m *Metrics) RecordMessagesDeleteFailed(count int)

RecordMessagesDeleteFailed records failed message delete attempts from SQS.

func (*Metrics) RecordMessagesDeleted

func (m *Metrics) RecordMessagesDeleted(count int)

RecordMessagesDeleted records messages deleted from SQS.

func (*Metrics) RecordMessagesReceived

func (m *Metrics) RecordMessagesReceived(count int)

RecordMessagesReceived records messages received from SQS.

func (*Metrics) StartFlusher

func (m *Metrics) StartFlusher(ctx context.Context, interval time.Duration)

StartFlusher starts a background goroutine that flushes metrics periodically.

type MetricsConfig

type MetricsConfig struct {
	Namespace string // For future CloudWatch support
	WorkerID  string
	Logger    *slog.Logger
}

MetricsConfig configures the metrics collector.

type MetricsRecorder

type MetricsRecorder interface {
	RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)
	RecordHeartbeat(succeeded bool)
	RecordMessagesReceived(count int)
	RecordMessagesDeleted(count int)
	RecordMessagesDeleteFailed(count int)
	Flush(ctx context.Context) error
	StartFlusher(ctx context.Context, interval time.Duration)
}

MetricsRecorder is the interface for recording worker metrics.

type NativeSyncPayload

type NativeSyncPayload struct {
	Provider     string `json:"provider"`
	Table        string `json:"table,omitempty"`
	ScheduleName string `json:"schedule_name,omitempty"`
}

type NoOpIdempotencyStore

type NoOpIdempotencyStore struct{}

NoOpIdempotencyStore is a no-op implementation for testing or when idempotency is disabled.

func (*NoOpIdempotencyStore) IsProcessed

func (s *NoOpIdempotencyStore) IsProcessed(ctx context.Context, messageID string) (bool, error)

func (*NoOpIdempotencyStore) MarkCompleted

func (s *NoOpIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error

func (*NoOpIdempotencyStore) MarkFailed

func (s *NoOpIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error

func (*NoOpIdempotencyStore) MarkProcessing

func (s *NoOpIdempotencyStore) MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)

type NoOpMetrics

type NoOpMetrics struct{}

NoOpMetrics is a metrics implementation that does nothing.

func (*NoOpMetrics) Flush

func (n *NoOpMetrics) Flush(ctx context.Context) error

func (*NoOpMetrics) RecordHeartbeat

func (n *NoOpMetrics) RecordHeartbeat(succeeded bool)

func (*NoOpMetrics) RecordJobProcessed

func (n *NoOpMetrics) RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)

func (*NoOpMetrics) RecordMessagesDeleteFailed

func (n *NoOpMetrics) RecordMessagesDeleteFailed(count int)

func (*NoOpMetrics) RecordMessagesDeleted

func (n *NoOpMetrics) RecordMessagesDeleted(count int)

func (*NoOpMetrics) RecordMessagesReceived

func (n *NoOpMetrics) RecordMessagesReceived(count int)

func (*NoOpMetrics) StartFlusher

func (n *NoOpMetrics) StartFlusher(ctx context.Context, interval time.Duration)

type OrphanedJobScanner

type OrphanedJobScanner struct {
	// contains filtered or unexported fields
}

OrphanedJobScanner finds and recovers jobs that are stuck in "running" state with expired leases. This handles cases where a worker crashed after claiming a job but before completing it.

func NewOrphanedJobScanner

func NewOrphanedJobScanner(store *DynamoStore, queue Queue, logger *slog.Logger, interval time.Duration) *OrphanedJobScanner

NewOrphanedJobScanner creates a new scanner.

func (*OrphanedJobScanner) Start

func (s *OrphanedJobScanner) Start(ctx context.Context)

Start begins scanning for orphaned jobs periodically.

type Queue

type Queue interface {
	Enqueue(ctx context.Context, msg JobMessage) error
	EnqueueWithDelay(ctx context.Context, msg JobMessage, delay time.Duration) error
	Receive(ctx context.Context, maxMessages int, waitTime time.Duration, visibilityTimeout time.Duration) ([]QueueMessage, error)
	Delete(ctx context.Context, receiptHandle string) error
	DeleteBatch(ctx context.Context, receiptHandles []string) (succeeded int, failed []string, err error)
	ExtendVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error
	ExtendVisibilityBatch(ctx context.Context, receiptHandles []string, timeout time.Duration) (succeeded int, failed int, err error)
}

type QueueMessage

type QueueMessage struct {
	ID            string
	ReceiptHandle string
	Body          string
}

type ResourceRef

type ResourceRef struct {
	Provider     string `json:"provider"`
	Service      string `json:"service,omitempty"`
	ResourceType string `json:"resource_type,omitempty"`
	Identifier   string `json:"identifier"`
	Resource     string `json:"resource"`
	File         string `json:"file,omitempty"`
	Line         int    `json:"line,omitempty"`
	Snippet      string `json:"snippet,omitempty"`
	Confidence   string `json:"confidence,omitempty"`
}

type SQSQueue

type SQSQueue struct {
	// contains filtered or unexported fields
}

func NewSQSQueue

func NewSQSQueue(cfg aws.Config, queueURL string) *SQSQueue

func NewSQSQueueWithConfig

func NewSQSQueueWithConfig(cfg aws.Config, config SQSQueueConfig) *SQSQueue

func (*SQSQueue) Delete

func (q *SQSQueue) Delete(ctx context.Context, receiptHandle string) error

func (*SQSQueue) DeleteBatch

func (q *SQSQueue) DeleteBatch(ctx context.Context, receiptHandles []string) (succeeded int, failed []string, err error)

func (*SQSQueue) Enqueue

func (q *SQSQueue) Enqueue(ctx context.Context, msg JobMessage) error

func (*SQSQueue) EnqueueWithDelay

func (q *SQSQueue) EnqueueWithDelay(ctx context.Context, msg JobMessage, delay time.Duration) error

func (*SQSQueue) ExtendVisibility

func (q *SQSQueue) ExtendVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error

func (*SQSQueue) ExtendVisibilityBatch

func (q *SQSQueue) ExtendVisibilityBatch(ctx context.Context, receiptHandles []string, timeout time.Duration) (succeeded int, failed int, err error)

func (*SQSQueue) Receive

func (q *SQSQueue) Receive(ctx context.Context, maxMessages int, waitTime time.Duration, visibilityTimeout time.Duration) ([]QueueMessage, error)

type SQSQueueConfig

type SQSQueueConfig struct {
	QueueURL string
	IsFIFO   bool // Set to true for FIFO queues (.fifo suffix)
}

SQSQueueConfig configures the SQS queue.

type Status

type Status string
const (
	StatusQueued    Status = "queued"
	StatusRunning   Status = "running"
	StatusSucceeded Status = "succeeded"
	StatusFailed    Status = "failed"
)

func (Status) Terminal

func (s Status) Terminal() bool

type Store

type Store interface {
	CreateJob(ctx context.Context, job *Job) error
	GetJob(ctx context.Context, jobID string) (*Job, error)
	ClaimJob(ctx context.Context, jobID, workerID string, lease time.Duration) (*Job, bool, error)
	ExtendLease(ctx context.Context, jobID, workerID string, lease time.Duration) error
	CompleteJob(ctx context.Context, jobID, result string) error
	FailJob(ctx context.Context, jobID, message string) error
	RetryJob(ctx context.Context, jobID, message string) error
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from an SQS queue.

func NewWorker

func NewWorker(queue Queue, store Store, registry *JobRegistry, opts WorkerOptions) *Worker

NewWorker creates a new job worker.

func (*Worker) Health

func (w *Worker) Health() HealthStatus

Health returns the current health status of the worker.

func (*Worker) IsHealthy

func (w *Worker) IsHealthy() bool

IsHealthy returns true if the worker is healthy and ready to process jobs.

func (*Worker) Shutdown

func (w *Worker) Shutdown()

Shutdown signals the worker to stop processing new jobs.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start begins processing jobs. Blocks until context is canceled or Shutdown is called.

type WorkerOptions

type WorkerOptions struct {
	Concurrency       int
	VisibilityTimeout time.Duration
	HeartbeatInterval time.Duration
	JobTimeout        time.Duration
	PollWait          time.Duration
	DrainTimeout      time.Duration
	RetryBaseDelay    time.Duration
	RetryMaxDelay     time.Duration
	WorkerID          string
	Logger            *slog.Logger
	Metrics           MetricsRecorder
	CircuitBreaker    *CircuitBreaker
	Idempotency       IdempotencyStore
}

WorkerOptions configures the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL