watcher

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

internal/watcher/circuitbreaker.go (NOVO ARQUIVO!)

Package watcher provides a robust file system monitoring and processing pipeline.

Gordon Watcher monitors directories for new files, validates them, ensures idempotency through SHA256 hashing, and publishes them to a message queue for downstream processing.

Architecture

The watcher follows an event-driven architecture with the following components:

  • fsnotify integration for real-time file system events
  • Worker pool for concurrent file processing
  • Rate limiter to prevent system overload
  • Stability checker to ensure files are fully written
  • Circuit breaker for resilient queue publishing
  • Distributed locks via Redis for multi-instance coordination

Basic Usage

cfg := watcher.Config{
	Paths:             []string{"/data/incoming"},
	FilePatterns:      []string{"*.xml", "*.json"},
	MaxWorkers:        10,
	MaxFilesPerSecond: 100,
	WorkingDir:        "/data",
	Queue:             myQueue,
	Storage:           myStorage,
	Logger:            myLogger,
}

w, err := watcher.New(cfg)
if err != nil {
	log.Fatal(err)
}

ctx := context.Background()
if err := w.Start(ctx); err != nil {
	log.Fatal(err)
}

// Graceful shutdown
defer w.Stop(context.Background())

File Processing Flow

1. File detected in monitored directory 2. Stability check (waits for file to stop changing) 3. Pattern matching and size validation 4. SHA256 hash calculation 5. Idempotency check (skip if already processed) 6. Distributed lock acquisition 7. Move to processing directory 8. Publish to message queue (with retry + circuit breaker) 9. File remains in processing until external worker completes

Resilience Features

  • Automatic retry with exponential backoff
  • Circuit breaker to prevent cascading failures
  • Orphan file reconciliation on startup
  • Dead Letter Queue (DLQ) for failed messages
  • Graceful shutdown with in-flight request completion

Observability

The watcher provides comprehensive observability:

  • Prometheus metrics for monitoring
  • OpenTelemetry tracing for distributed debugging
  • Structured logging with configurable levels
  • Health and readiness endpoints

See the Config type for all available configuration options.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExtractZip added in v1.0.2

func ExtractZip(zipPath, destDir string) ([]string, error)

ExtractZip extracts a ZIP file to the specified destination directory

func IsZipFile added in v1.0.2

func IsZipFile(filename string) bool

IsZipFile checks if a file is a ZIP file based on extension

func Retry

func Retry(ctx context.Context, cfg RetryConfig, fn func() error) error

Retry retries a function with exponential backoff

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewCircuitBreaker

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker

func (*CircuitBreaker) Call

func (cb *CircuitBreaker) Call(fn func() error) error

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() State

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner removes empty directories

func NewCleaner

func NewCleaner(workingDir string, protectedDirs []string, interval time.Duration, log *logger.Logger) *Cleaner

NewCleaner creates a new directory cleaner

func (*Cleaner) Start

func (c *Cleaner) Start()

Start starts the cleaner

func (*Cleaner) Stop

func (c *Cleaner) Stop()

Stop stops the cleaner

type CleanupConfig added in v1.0.2

type CleanupConfig struct {
	WorkingDir string
	SubDirs    SubDirectories
	Retention  map[string]int
	Schedule   string
	Logger     *logger.Logger
}

CleanupConfig holds cleanup configuration

type CleanupScheduler added in v1.0.2

type CleanupScheduler struct {
	// contains filtered or unexported fields
}

CleanupScheduler handles scheduled directory cleanup

func NewCleanupScheduler added in v1.0.2

func NewCleanupScheduler(cfg CleanupConfig) *CleanupScheduler

NewCleanupScheduler creates a new cleanup scheduler

func (*CleanupScheduler) Start added in v1.0.2

func (cs *CleanupScheduler) Start() error

Start starts the cleanup scheduler

func (*CleanupScheduler) Stop added in v1.0.2

func (cs *CleanupScheduler) Stop()

Stop stops the cleanup scheduler

type Config

type Config struct {
	// Paths to watch
	Paths []string

	// File patterns to match
	FilePatterns []string

	// Patterns to exclude
	ExcludePatterns []string

	// File size constraints (bytes)
	MinFileSize int64
	MaxFileSize int64

	// Stability check settings
	StableAttempts int
	StableDelay    time.Duration

	// Cleanup interval for empty directories
	CleanupInterval time.Duration

	// Worker pool settings
	MaxWorkers        int
	MaxFilesPerSecond int
	WorkerQueueSize   int

	// Working directory
	WorkingDir string

	// Subdirectories
	SubDirs SubDirectories

	// Dependencies
	Queue   queue.Queue
	Storage storage.Storage
	Logger  *logger.Logger
}

Config holds watcher configuration

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter limits the rate of file processing

func NewRateLimiter

func NewRateLimiter(maxFilesPerSecond int) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

Allow checks if an operation is allowed under the rate limit

func (*RateLimiter) Wait

func (r *RateLimiter) Wait()

Wait waits until the rate limit allows another operation

type RetryConfig

type RetryConfig struct {
	MaxAttempts  int
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

RetryConfig configures retry behavior

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns default retry configuration

type StabilityChecker

type StabilityChecker struct {
	// contains filtered or unexported fields
}

StabilityChecker checks if a file has stabilized (stopped changing)

func NewStabilityChecker

func NewStabilityChecker(attempts int, delay time.Duration) *StabilityChecker

NewStabilityChecker creates a new stability checker

func (*StabilityChecker) WaitForStability

func (s *StabilityChecker) WaitForStability(ctx context.Context, path string) bool

WaitForStability waits for a file to stabilize

type State

type State int
const (
	StateClosed   State = iota // Normal
	StateOpen                  // Cortado (muitos erros)
	StateHalfOpen              // Testando recuperação
)

type SubDirectories

type SubDirectories struct {
	Processing string
	Processed  string
	Failed     string
	Ignored    string
	Tmp        string
}

SubDirectories defines directory structure

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher monitors file system events

func New

func New(cfg Config) (*Watcher, error)

New creates a new Watcher instance

func (*Watcher) Start

func (w *Watcher) Start(ctx context.Context) error

Start starts the watcher

func (*Watcher) Stop

func (w *Watcher) Stop(ctx context.Context) error

Stop stops the watcher gracefully

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers for processing files

func NewWorkerPool

func NewWorkerPool(maxWorkers, queueSize int, processor func(context.Context, string) error) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) Start

func (p *WorkerPool) Start()

Start starts the worker pool

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop stops the worker pool

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(path string)

Submit submits a file path to the worker pool

func (*WorkerPool) SubmitBlocking added in v1.0.2

func (p *WorkerPool) SubmitBlocking(path string)

SubmitBlocking submits a file path to the worker pool, blocking if the queue is full

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL