Documentation
¶
Index ¶
- Variables
- func IsPermanent(err error) bool
- func LoadAWSConfig(ctx context.Context, region string) (aws.Config, error)
- func PermanentError(err error) error
- func RetryableError(err error) error
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitState
- type DLQConsumer
- type DLQConsumerConfig
- type DynamoIdempotencyStore
- func (s *DynamoIdempotencyStore) IsProcessed(ctx context.Context, messageID string) (bool, error)
- func (s *DynamoIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error
- func (s *DynamoIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error
- func (s *DynamoIdempotencyStore) MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)
- type DynamoStore
- func (s *DynamoStore) ClaimJob(ctx context.Context, jobID, workerID string, lease time.Duration) (*Job, bool, error)
- func (s *DynamoStore) CompleteJob(ctx context.Context, jobID, result string) error
- func (s *DynamoStore) CreateJob(ctx context.Context, job *Job) error
- func (s *DynamoStore) ExtendLease(ctx context.Context, jobID, workerID string, lease time.Duration) error
- func (s *DynamoStore) FailJob(ctx context.Context, jobID, message string) error
- func (s *DynamoStore) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (s *DynamoStore) RetryJob(ctx context.Context, jobID, message string) error
- type EnqueueOptions
- type HealthServer
- type HealthStatus
- type IdempotencyRecord
- type IdempotencyStore
- type InspectOverrides
- type InspectResourcePayload
- type Job
- type JobBatch
- type JobHandler
- type JobMessage
- type JobRegistry
- type JobType
- type Manager
- func (m *Manager) EnqueueInspectResources(ctx context.Context, resources []ResourceRef, opts EnqueueOptions) (*JobBatch, error)
- func (m *Manager) EnqueueNativeSync(ctx context.Context, payload NativeSyncPayload, opts EnqueueOptions) (*Job, error)
- func (m *Manager) WaitForJobs(ctx context.Context, jobIDs []string, pollInterval time.Duration) ([]*Job, error)
- type Metrics
- func (m *Metrics) Flush(ctx context.Context) error
- func (m *Metrics) RecordHeartbeat(succeeded bool)
- func (m *Metrics) RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)
- func (m *Metrics) RecordMessagesDeleteFailed(count int)
- func (m *Metrics) RecordMessagesDeleted(count int)
- func (m *Metrics) RecordMessagesReceived(count int)
- func (m *Metrics) StartFlusher(ctx context.Context, interval time.Duration)
- type MetricsConfig
- type MetricsRecorder
- type NativeSyncPayload
- type NoOpIdempotencyStore
- func (s *NoOpIdempotencyStore) IsProcessed(ctx context.Context, messageID string) (bool, error)
- func (s *NoOpIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error
- func (s *NoOpIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error
- func (s *NoOpIdempotencyStore) MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)
- type NoOpMetrics
- func (n *NoOpMetrics) Flush(ctx context.Context) error
- func (n *NoOpMetrics) RecordHeartbeat(succeeded bool)
- func (n *NoOpMetrics) RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)
- func (n *NoOpMetrics) RecordMessagesDeleteFailed(count int)
- func (n *NoOpMetrics) RecordMessagesDeleted(count int)
- func (n *NoOpMetrics) RecordMessagesReceived(count int)
- func (n *NoOpMetrics) StartFlusher(ctx context.Context, interval time.Duration)
- type OrphanedJobScanner
- type Queue
- type QueueMessage
- type ResourceRef
- type SQSQueue
- func (q *SQSQueue) Delete(ctx context.Context, receiptHandle string) error
- func (q *SQSQueue) DeleteBatch(ctx context.Context, receiptHandles []string) (succeeded int, failed []string, err error)
- func (q *SQSQueue) Enqueue(ctx context.Context, msg JobMessage) error
- func (q *SQSQueue) EnqueueWithDelay(ctx context.Context, msg JobMessage, delay time.Duration) error
- func (q *SQSQueue) ExtendVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error
- func (q *SQSQueue) ExtendVisibilityBatch(ctx context.Context, receiptHandles []string, timeout time.Duration) (succeeded int, failed int, err error)
- func (q *SQSQueue) Receive(ctx context.Context, maxMessages int, waitTime time.Duration, ...) ([]QueueMessage, error)
- type SQSQueueConfig
- type Status
- type Store
- type Worker
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPermanent indicates the job should not be retried ErrPermanent = errors.New("permanent error") // ErrRetryable indicates the job can be retried ErrRetryable = errors.New("retryable error") )
Error types for retry handling
var ErrCircuitOpen = errors.New("circuit breaker is open")
ErrCircuitOpen is returned when the circuit breaker is open.
Functions ¶
func IsPermanent ¶
IsPermanent checks if an error is marked as permanent (should not retry).
func PermanentError ¶
PermanentError wraps an error to indicate it should not be retried.
func RetryableError ¶
RetryableError wraps an error to indicate it can be retried.
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for handling failures.
func NewCircuitBreaker ¶
func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker.
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() bool
Allow checks if a request should be allowed through. Returns true if the request can proceed, false if circuit is open.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() CircuitState
State returns the current state of the circuit breaker.
func (*CircuitBreaker) Stats ¶
func (cb *CircuitBreaker) Stats() (state CircuitState, failures int, successes int)
Stats returns current circuit breaker statistics.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // Default: 5
SuccessThreshold int // Default: 2
Timeout time.Duration // Default: 30s
}
CircuitBreakerConfig configures the circuit breaker.
type CircuitState ¶
type CircuitState int
CircuitState represents the state of the circuit breaker.
const ( CircuitClosed CircuitState = iota // Normal operation CircuitOpen // Failing, rejecting requests CircuitHalfOpen // Testing if service recovered )
func (CircuitState) String ¶
func (s CircuitState) String() string
type DLQConsumer ¶
type DLQConsumer struct {
// contains filtered or unexported fields
}
DLQConsumer processes messages from the dead letter queue.
func NewDLQConsumer ¶
func NewDLQConsumer(dlqQueue, mainQueue Queue, store Store, logger *slog.Logger, config DLQConsumerConfig) *DLQConsumer
NewDLQConsumer creates a new DLQ consumer.
func (*DLQConsumer) ReplayMessage ¶
func (c *DLQConsumer) ReplayMessage(ctx context.Context, msg QueueMessage) error
ReplayMessage re-enqueues a dead letter message to the main queue.
func (*DLQConsumer) Start ¶
func (c *DLQConsumer) Start(ctx context.Context)
Start begins consuming from the DLQ.
type DLQConsumerConfig ¶
type DLQConsumerConfig struct {
PollWait time.Duration
VisibilityTimeout time.Duration
HeartbeatInterval time.Duration
// OnDeadLetter is called for each dead letter. Can be used for alerting.
// If nil, messages are logged and deleted.
OnDeadLetter func(ctx context.Context, msg QueueMessage, job *Job) error
}
DLQConsumerConfig configures the DLQ consumer.
type DynamoIdempotencyStore ¶
type DynamoIdempotencyStore struct {
// contains filtered or unexported fields
}
DynamoIdempotencyStore implements IdempotencyStore using DynamoDB.
func NewDynamoIdempotencyStore ¶
func NewDynamoIdempotencyStore(cfg aws.Config, table string) *DynamoIdempotencyStore
NewDynamoIdempotencyStore creates a new DynamoDB-backed idempotency store. The table should have: - Partition key: message_id (String) - TTL attribute: expires_at
func (*DynamoIdempotencyStore) IsProcessed ¶
func (*DynamoIdempotencyStore) MarkCompleted ¶
func (s *DynamoIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error
func (*DynamoIdempotencyStore) MarkFailed ¶
func (s *DynamoIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error
type DynamoStore ¶
type DynamoStore struct {
// contains filtered or unexported fields
}
func NewDynamoStore ¶
func NewDynamoStore(cfg aws.Config, table string) *DynamoStore
func (*DynamoStore) CompleteJob ¶
func (s *DynamoStore) CompleteJob(ctx context.Context, jobID, result string) error
func (*DynamoStore) CreateJob ¶
func (s *DynamoStore) CreateJob(ctx context.Context, job *Job) error
func (*DynamoStore) ExtendLease ¶
func (*DynamoStore) FailJob ¶
func (s *DynamoStore) FailJob(ctx context.Context, jobID, message string) error
type EnqueueOptions ¶
type HealthServer ¶
type HealthServer struct {
// contains filtered or unexported fields
}
HealthServer provides HTTP health check endpoints for the worker.
func NewHealthServer ¶
func NewHealthServer(worker *Worker, addr string, logger *slog.Logger) *HealthServer
NewHealthServer creates a new health server.
func (*HealthServer) Shutdown ¶
func (hs *HealthServer) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the health server.
func (*HealthServer) Start ¶
func (hs *HealthServer) Start() error
Start starts the health server in a goroutine.
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
WorkerID string `json:"worker_id"`
ShuttingDown bool `json:"shutting_down"`
CircuitState string `json:"circuit_state"`
PanicCount int64 `json:"panic_count"`
LastActivity int64 `json:"last_activity_unix"`
}
HealthStatus represents the worker's health state.
type IdempotencyRecord ¶
type IdempotencyRecord struct {
MessageID string `dynamodbav:"message_id"`
Status string `dynamodbav:"status"` // "processing", "completed"
WorkerID string `dynamodbav:"worker_id"`
ProcessedAt int64 `dynamodbav:"processed_at"`
ExpiresAt int64 `dynamodbav:"expires_at"` // TTL attribute
}
IdempotencyRecord represents a processed message record.
type IdempotencyStore ¶
type IdempotencyStore interface {
// MarkProcessing attempts to mark a message as being processed.
// Returns true if this is the first time seeing this message.
// Returns false if the message was already processed or is being processed.
MarkProcessing(ctx context.Context, messageID string, workerID string, ttl time.Duration) (bool, error)
// MarkCompleted marks a message as successfully processed.
MarkCompleted(ctx context.Context, messageID string) error
// MarkFailed removes the processing lock so the message can be retried.
MarkFailed(ctx context.Context, messageID string) error
// IsProcessed checks if a message was already successfully processed.
IsProcessed(ctx context.Context, messageID string) (bool, error)
}
IdempotencyStore tracks processed message IDs to prevent duplicate processing. Uses DynamoDB with TTL for automatic cleanup.
type InspectOverrides ¶
type InspectResourcePayload ¶
type InspectResourcePayload struct {
Resource ResourceRef `json:"resource"`
Overrides InspectOverrides `json:"overrides,omitempty"`
}
type Job ¶
type Job struct {
ID string `json:"id" dynamodbav:"job_id"`
Type JobType `json:"type" dynamodbav:"type"`
Status Status `json:"status" dynamodbav:"status"`
Payload string `json:"payload" dynamodbav:"payload"`
Result string `json:"result,omitempty" dynamodbav:"result,omitempty"`
Error string `json:"error,omitempty" dynamodbav:"error,omitempty"`
Attempt int `json:"attempt" dynamodbav:"attempt"`
MaxAttempts int `json:"max_attempts" dynamodbav:"max_attempts"`
GroupID string `json:"group_id,omitempty" dynamodbav:"group_id,omitempty"`
WorkerID string `json:"worker_id,omitempty" dynamodbav:"worker_id,omitempty"`
LeaseExpiresAt int64 `json:"lease_expires_at,omitempty" dynamodbav:"lease_expires_at,omitempty"`
CreatedAt int64 `json:"created_at" dynamodbav:"created_at"`
UpdatedAt int64 `json:"updated_at" dynamodbav:"updated_at"`
// Tracing fields
CorrelationID string `json:"correlation_id,omitempty" dynamodbav:"correlation_id,omitempty"`
ParentID string `json:"parent_id,omitempty" dynamodbav:"parent_id,omitempty"`
}
type JobBatch ¶
type JobBatch struct {
GroupID string `json:"group_id"`
JobIDs []string `json:"job_ids"`
QueuedAt time.Time `json:"queued_at"`
TotalJobs int `json:"total_jobs"`
MaxAttempts int `json:"max_attempts"`
RepoURL string `json:"repo_url,omitempty"`
FilesScanned int `json:"files_scanned,omitempty"`
Truncated bool `json:"truncated"`
}
type JobHandler ¶
JobHandler processes a job and returns the result.
func NewInspectResourceHandler ¶
func NewInspectResourceHandler(tools *agents.SecurityTools) JobHandler
NewInspectResourceHandler creates a handler for InspectResource jobs.
type JobMessage ¶
type JobMessage struct {
JobID string `json:"job_id"`
GroupID string `json:"group_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
// Attempt number for retry tracking (used in deduplication ID generation)
Attempt int `json:"attempt,omitempty"`
// DeduplicationID for FIFO queues - if empty, generates unique ID from job_id:attempt:timestamp
DeduplicationID string `json:"deduplication_id,omitempty"`
}
type JobRegistry ¶
type JobRegistry struct {
// contains filtered or unexported fields
}
JobRegistry manages job type handlers.
func (*JobRegistry) Get ¶
func (r *JobRegistry) Get(jobType JobType) (JobHandler, bool)
Get returns the handler for a job type.
func (*JobRegistry) Register ¶
func (r *JobRegistry) Register(jobType JobType, handler JobHandler)
Register registers a handler for a job type.
func (*JobRegistry) RegisteredTypes ¶
func (r *JobRegistry) RegisteredTypes() []JobType
RegisteredTypes returns all registered job types.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func (*Manager) EnqueueInspectResources ¶
func (m *Manager) EnqueueInspectResources(ctx context.Context, resources []ResourceRef, opts EnqueueOptions) (*JobBatch, error)
func (*Manager) EnqueueNativeSync ¶
func (m *Manager) EnqueueNativeSync(ctx context.Context, payload NativeSyncPayload, opts EnqueueOptions) (*Job, error)
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics collects worker metrics and logs them periodically.
func NewMetrics ¶
func NewMetrics(logger *slog.Logger, config MetricsConfig) *Metrics
NewMetrics creates a new metrics collector. Note: Currently logs metrics. CloudWatch support can be added by vendoring github.com/aws/aws-sdk-go-v2/service/cloudwatch
func (*Metrics) RecordHeartbeat ¶
RecordHeartbeat records a heartbeat attempt.
func (*Metrics) RecordJobProcessed ¶
RecordJobProcessed records a job completion with its duration and status.
func (*Metrics) RecordMessagesDeleteFailed ¶
RecordMessagesDeleteFailed records failed message delete attempts from SQS.
func (*Metrics) RecordMessagesDeleted ¶
RecordMessagesDeleted records messages deleted from SQS.
func (*Metrics) RecordMessagesReceived ¶
RecordMessagesReceived records messages received from SQS.
type MetricsConfig ¶
type MetricsConfig struct {
Namespace string // For future CloudWatch support
WorkerID string
Logger *slog.Logger
}
MetricsConfig configures the metrics collector.
type MetricsRecorder ¶
type MetricsRecorder interface {
RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)
RecordHeartbeat(succeeded bool)
RecordMessagesReceived(count int)
RecordMessagesDeleted(count int)
RecordMessagesDeleteFailed(count int)
Flush(ctx context.Context) error
StartFlusher(ctx context.Context, interval time.Duration)
}
MetricsRecorder is the interface for recording worker metrics.
type NativeSyncPayload ¶
type NoOpIdempotencyStore ¶
type NoOpIdempotencyStore struct{}
NoOpIdempotencyStore is a no-op implementation for testing or when idempotency is disabled.
func (*NoOpIdempotencyStore) IsProcessed ¶
func (*NoOpIdempotencyStore) MarkCompleted ¶
func (s *NoOpIdempotencyStore) MarkCompleted(ctx context.Context, messageID string) error
func (*NoOpIdempotencyStore) MarkFailed ¶
func (s *NoOpIdempotencyStore) MarkFailed(ctx context.Context, messageID string) error
type NoOpMetrics ¶
type NoOpMetrics struct{}
NoOpMetrics is a metrics implementation that does nothing.
func (*NoOpMetrics) RecordHeartbeat ¶
func (n *NoOpMetrics) RecordHeartbeat(succeeded bool)
func (*NoOpMetrics) RecordJobProcessed ¶
func (n *NoOpMetrics) RecordJobProcessed(duration time.Duration, succeeded bool, timedOut bool)
func (*NoOpMetrics) RecordMessagesDeleteFailed ¶
func (n *NoOpMetrics) RecordMessagesDeleteFailed(count int)
func (*NoOpMetrics) RecordMessagesDeleted ¶
func (n *NoOpMetrics) RecordMessagesDeleted(count int)
func (*NoOpMetrics) RecordMessagesReceived ¶
func (n *NoOpMetrics) RecordMessagesReceived(count int)
func (*NoOpMetrics) StartFlusher ¶
func (n *NoOpMetrics) StartFlusher(ctx context.Context, interval time.Duration)
type OrphanedJobScanner ¶
type OrphanedJobScanner struct {
// contains filtered or unexported fields
}
OrphanedJobScanner finds and recovers jobs that are stuck in "running" state with expired leases. This handles cases where a worker crashed after claiming a job but before completing it.
func NewOrphanedJobScanner ¶
func NewOrphanedJobScanner(store *DynamoStore, queue Queue, logger *slog.Logger, interval time.Duration) *OrphanedJobScanner
NewOrphanedJobScanner creates a new scanner.
func (*OrphanedJobScanner) Start ¶
func (s *OrphanedJobScanner) Start(ctx context.Context)
Start begins scanning for orphaned jobs periodically.
type Queue ¶
type Queue interface {
Enqueue(ctx context.Context, msg JobMessage) error
EnqueueWithDelay(ctx context.Context, msg JobMessage, delay time.Duration) error
Receive(ctx context.Context, maxMessages int, waitTime time.Duration, visibilityTimeout time.Duration) ([]QueueMessage, error)
Delete(ctx context.Context, receiptHandle string) error
DeleteBatch(ctx context.Context, receiptHandles []string) (succeeded int, failed []string, err error)
ExtendVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error
ExtendVisibilityBatch(ctx context.Context, receiptHandles []string, timeout time.Duration) (succeeded int, failed int, err error)
}
type QueueMessage ¶
type ResourceRef ¶
type ResourceRef struct {
Provider string `json:"provider"`
Service string `json:"service,omitempty"`
ResourceType string `json:"resource_type,omitempty"`
Identifier string `json:"identifier"`
Resource string `json:"resource"`
File string `json:"file,omitempty"`
Line int `json:"line,omitempty"`
Snippet string `json:"snippet,omitempty"`
Confidence string `json:"confidence,omitempty"`
}
type SQSQueue ¶
type SQSQueue struct {
// contains filtered or unexported fields
}
func NewSQSQueueWithConfig ¶
func NewSQSQueueWithConfig(cfg aws.Config, config SQSQueueConfig) *SQSQueue
func (*SQSQueue) DeleteBatch ¶
func (*SQSQueue) EnqueueWithDelay ¶
func (*SQSQueue) ExtendVisibility ¶
func (*SQSQueue) ExtendVisibilityBatch ¶
type SQSQueueConfig ¶
type SQSQueueConfig struct {
QueueURL string
IsFIFO bool // Set to true for FIFO queues (.fifo suffix)
}
SQSQueueConfig configures the SQS queue.
type Store ¶
type Store interface {
CreateJob(ctx context.Context, job *Job) error
GetJob(ctx context.Context, jobID string) (*Job, error)
ClaimJob(ctx context.Context, jobID, workerID string, lease time.Duration) (*Job, bool, error)
ExtendLease(ctx context.Context, jobID, workerID string, lease time.Duration) error
CompleteJob(ctx context.Context, jobID, result string) error
FailJob(ctx context.Context, jobID, message string) error
RetryJob(ctx context.Context, jobID, message string) error
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes jobs from an SQS queue.
func NewWorker ¶
func NewWorker(queue Queue, store Store, registry *JobRegistry, opts WorkerOptions) *Worker
NewWorker creates a new job worker.
func (*Worker) Health ¶
func (w *Worker) Health() HealthStatus
Health returns the current health status of the worker.
func (*Worker) IsHealthy ¶
IsHealthy returns true if the worker is healthy and ready to process jobs.
type WorkerOptions ¶
type WorkerOptions struct {
Concurrency int
VisibilityTimeout time.Duration
HeartbeatInterval time.Duration
JobTimeout time.Duration
PollWait time.Duration
DrainTimeout time.Duration
RetryBaseDelay time.Duration
RetryMaxDelay time.Duration
WorkerID string
Logger *slog.Logger
Metrics MetricsRecorder
CircuitBreaker *CircuitBreaker
Idempotency IdempotencyStore
}
WorkerOptions configures the worker.