README
ΒΆ
SpiderFoot Fetcher
High-performance concurrent pipeline for processing SpiderFoot scan results with CVE enrichment and CISA data integration.
π Features
- π 3-Stage Concurrent Pipeline: Reader β Parser β Indexer with configurable worker pools
- β‘ High Performance: Concurrent processing with non-blocking channels
- π‘οΈ CVE Enrichment: Automatic CVE scoring with CISA KEV and EPSS integration
- π Real-time Monitoring: Suricata-style performance metrics and statistics
- π Smart Error Handling: Detailed error logging with record traceability
- β° Safe Timestamp Management: Prevents duplicate processing during concurrent runs
- π Performance Profiling: Built-in pprof support for optimization
π Table of Contents
- Installation
- Quick Start
- Configuration
- Operation Modes
- Organization Data
- Pipeline Architecture
- CVE Enrichment
- Performance Monitoring
- Error Handling
- Testing
- Contributing
- License
π οΈ Installation
Prerequisites
- Go 1.21 or higher
- SQLite3 (SpiderFoot database)
- Elasticsearch 7.x/8.x cluster
- Access to CVE data and EPSS indices
Install from Source
git clone https://github.com/luhtaf/spiderfoot-fetcher.git
cd spiderfoot-fetcher
go mod tidy
go build -o spiderfoot-fetcher
Docker Installation
docker pull luhtaf/spiderfoot-fetcher:latest
π Quick Start
1. Configure the Pipeline
cp config.yaml.example config.yaml
# Edit config.yaml with your settings
2. Choose Operation Mode
The application supports two distinct operation modes:
π Pipeline Mode
Processes new SpiderFoot scan results with full enrichment:
# Run pipeline mode (required argument)
./spiderfoot-fetcher pipeline
π§ Migration Mode
Updates existing Elasticsearch records with new enrichment data:
# Run migration mode (required argument)
./spiderfoot-fetcher migrate
3. Command Line Interface
Available Commands
# Show help
./spiderfoot-fetcher help
# Run migration (update existing records)
./spiderfoot-fetcher migrate
# Run pipeline (process new records)
./spiderfoot-fetcher pipeline
Development vs Production
# Development mode (dry run - doesn't index to Elasticsearch)
# Edit config.yaml: app.type = "development"
./spiderfoot-fetcher migrate
# Production mode (indexes to Elasticsearch)
# Edit config.yaml: app.type = "production"
./spiderfoot-fetcher migrate
4. Schedule for Production
# Pipeline: Hourly processing of new scan results
0 * * * * /path/to/spiderfoot-fetcher pipeline >> /var/log/spiderfoot-pipeline.log 2>&1
# Migration: Weekly refresh of CVE/EPSS data (Sunday 2 AM)
0 2 * * 0 /path/to/spiderfoot-fetcher migrate >> /var/log/spiderfoot-migration.log 2>&1
# Or use systemd timer for pipeline
sudo systemctl enable --now spiderfoot-fetcher.timer
4. Monitor Performance
# Real-time stats
tail -f pipeline_stats.json
# Error monitoring
tail -f error.log
# Performance profiling
go tool pprof http://localhost:6060/debug/pprof/profile
βοΈ Configuration
The pipeline is configured via config.yaml:
# Database Configuration
database:
path: "spiderfoot.db"
# Pipeline Workers Configuration
workers:
reader: 2 # SQL reader workers
parser: 4 # Data parser workers
indexer: 3 # Elasticsearch indexer workers
# Batch Configuration
batch:
size: 100 # Records per batch
# Elasticsearch Configuration
elasticsearch:
url: "http://localhost:9200"
username: "elastic"
password: "changeme"
verify_certs: false
index: "spiderfoot"
cve_index: "go-list-cve-*"
epss_index: "epss-scores"
# Application Configuration
# Note: Operation mode is specified via command line arguments
app:
type: "development" # "production" for live indexing
version: 2
timestamp_file: "timestamp_cron.txt"
fallback_hours: 12
error_log: "error.log"
organization_data: "organization_data.csv"
# Statistics Configuration
stats:
enabled: true
interval: 30s
file: "pipeline_stats.json"
Environment Variables
export SPIDERFOOT_DB_PATH="/path/to/spiderfoot.db"
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="your-password"
π Operation Modes
Pipeline Mode
Purpose: Process new SpiderFoot scan results from SQLite database Use Case: Regular data ingestion and enrichment
# Run pipeline mode
./spiderfoot-fetcher pipeline
Features:
- β Reads new records based on timestamp tracking
- β 3-stage concurrent processing (Reader β Parser β Indexer)
- β CVE enrichment with CISA and EPSS data
- β Organization data mapping from CSV
- β Bulk indexing for high performance
- β Prevents duplicate processing
Best for:
- Scheduled cron jobs (hourly/daily)
- Continuous data ingestion
- Processing fresh SpiderFoot scans
Migration Mode
Purpose: Update existing Elasticsearch records with new enrichment data Use Case: Applying new features to historical data
# Run migration mode (updates records with version < current_version)
./spiderfoot-fetcher migrate
Features:
- β Uses Scroll API for large dataset processing
- β Bulk update operations (500 records/batch)
- β Progress bar with ETA calculations
- β Fetches latest CISA and EPSS data
- β Updates organization subsektor mappings
- β Version-based targeting
- β Graceful cancellation support
Best for:
- One-time data migrations
- Applying new enrichment to existing records
- Version upgrades and schema updates
Migration Process:
- Counts total records to migrate
- Uses Scroll API to process in batches of 5000
- Updates records in bulk chunks of 500
- Shows real-time progress with speed/ETA
- Updates version number to prevent re-processing
π Why Migration is Important
CVE and EPSS Data is Dynamic: CVE scores and EPSS (Exploit Prediction Scoring System) data change frequently:
Day 1: CVE-2023-45802 β Score: 6.0, EPSS: 0.02
Day 2: CVE-2023-45802 β Score: 7.5, EPSS: 0.85 β οΈ Higher risk!
The Problem with Static Records:
- π Pipeline Mode: Only processes NEW SpiderFoot scan results
- π Elasticsearch Records: Remain static after initial indexing
- β° CVE/EPSS Updates: Happen daily but don't automatically update existing records
Solution - Regular Migration:
# Weekly migration to refresh CVE/EPSS data (recommended)
0 2 * * 0 /path/to/spiderfoot-fetcher migrate # Sunday 2 AM
# Manual migration when needed
./spiderfoot-fetcher migrate
# Increment version in config.yaml to force refresh
app:
version: 7 # Increment when you need to refresh all data
Real-world Example:
Jan 1: Record created with CVE-2023-45802, Score: 6.0 (MEDIUM)
Jan 5: CVE database updated, same CVE now Score: 7.5 (HIGH)
β Your Elasticsearch record still shows 6.0 β
Jan 7: Run migration β Record updated to 7.5 β
Migration Triggers:
- π Weekly/monthly data refresh
- π¨ After major CVE database updates
- π When EPSS scores change significantly
- π Adding new enrichment features
π’ Organization Data
CSV Format
Create organization_data.csv with organization to subsektor mapping:
organisasi,subsektor
Bank Bni,Perbankan
Bank Bri,Perbankan
Rumah Sakit Siloam Jakarta,Kesehatan
Telkom Indonesia,Telekomunikasi
Pln,Energi
Garuda Indonesia,Transportasi
Universitas Indonesia,Pendidikan
Configuration
app:
organization_data: "organization_data.csv" # Path to CSV file
Behavior
- Automatic Loading: CSV loaded at startup with logging
- Graceful Fallback: Missing file results in empty subsektor fields
- Case Sensitive: Exact match required between scan name and CSV data
- Flexible Format: Support for spaces and special characters in names
π¦ Deployment Patterns
π Cron Job (Recommended)
# Every hour at minute 0 - pipeline mode
0 * * * * cd /opt/spiderfoot-fetcher && ./spiderfoot-fetcher pipeline >> /var/log/spiderfoot.log 2>&1
# Every 30 minutes - pipeline mode
*/30 * * * * cd /opt/spiderfoot-fetcher && ./spiderfoot-fetcher pipeline
# Weekly migration at Sunday 2 AM to refresh CVE/EPSS data
0 2 * * 0 cd /opt/spiderfoot-fetcher && ./spiderfoot-fetcher migrate
π³ Docker Deployment
# One-time execution
docker run --rm -v /path/to/config:/config luhtaf/spiderfoot-fetcher
# With cron in container
docker run -d --name spiderfoot-cron \
-v /path/to/config:/config \
-v /path/to/db:/data \
luhtaf/spiderfoot-fetcher:latest
βοΈ Systemd Service + Timer
# /etc/systemd/system/spiderfoot-fetcher.service
[Unit]
Description=SpiderFoot to Elasticsearch Pipeline
After=network.target
[Service]
Type=oneshot
User=spiderfoot
WorkingDirectory=/opt/spiderfoot-fetcher
ExecStart=/opt/spiderfoot-fetcher/spiderfoot-fetcher
StandardOutput=journal
StandardError=journal
# /etc/systemd/system/spiderfoot-fetcher.timer
[Unit]
Description=Run SpiderFoot Pipeline every hour
Requires=spiderfoot-fetcher.service
[Timer]
OnCalendar=hourly
Persistent=true
[Install]
WantedBy=timers.target
π CI/CD Integration
# GitHub Actions example
name: SpiderFoot Data Pipeline
on:
schedule:
- cron: '0 */2 * * *' # Every 2 hours
jobs:
pipeline:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Pipeline
run: ./spiderfoot-fetcher
ποΈ Pipeline Architecture
π Short-Lived Worker Model
This pipeline uses a short-lived worker architecture designed for batch processing:
- π Batch-Oriented: Processes data between timestamps (e.g., last 12 hours)
- β±οΈ Finite Execution: Workers finish when no more data to process
- π Cron-Style: Designed to run periodically (via cron/scheduler)
- πΎ State Persistence: Saves timestamp for next run continuity
vs Long-Running Workers:
| Aspect | Short-Lived (This Pipeline) | Long-Running Workers (nvd-fetcher) |
|---|---|---|
| Lifecycle | Start β Process β Exit | Start β Listen Forever |
| Worker Pattern | Spawn per batch β Exit when done | 1000+ workers always listening |
| Processing Model | Batch-oriented (100 records/batch) | Per-item (1 CVE/worker) |
| Memory Profile | Memory efficient (periodic) | CPU efficient (continuous) |
| Use Case | ETL jobs, scheduled processing | Stream processing, real-time |
| Resource Usage | Periodic, bounded | Continuous, high throughput |
| Failure Recovery | Restart from last timestamp | In-memory state loss |
| Concurrency | Finite workers per stage | Massive parallelism (1000+ workers) |
| Deployment | Cron job, batch scheduler | Always-on service |
Stage 1: Reader Workers (Short-Lived)
- Purpose: Extract records from SpiderFoot SQLite database
- Lifecycle: Query database β Send to channel β Exit when no more records
- Pattern:
for { query() β send() β offset++ }until empty result - Concurrency: Multiple workers with offset-based pagination (Worker 1: offset 0, Worker 2: offset 100, etc.)
- Completion: Workers naturally terminate when their query returns empty
- Safety: Timestamp range isolation (
last_run < timestamp <= now)
Stage 2: Parser Workers (Channel-Driven)
- Purpose: Parse and enrich scan data until channel closes
- Lifecycle:
for record := range rawChanβ Exit when channel closes - Processing:
- Grok pattern parsing for organization metadata
- CVE enrichment with CISA KEV and EPSS data (with caching)
- Data validation and transformation
- Pattern: Channel consumer that terminates when upstream closes
- Intelligence: Conditional processing based on scan type
Stage 3: Indexer Workers (Bounded)
- Purpose: Index processed data to Elasticsearch until completion
- Lifecycle:
for record := range parsedChanβ Exit when channel closes - Features:
- Dynamic index naming with date partitioning
- Individual document indexing (not bulk)
- Error resilience with structured logging
- Pattern: Channel consumer with finite data set
- Monitoring: Per-operation performance tracking
π Short-Lived vs Long-Running Comparison
This Pipeline (Short-Lived):
// Reader: Query until no more data
for {
rows := db.Query("... LIMIT 100 OFFSET ?", offset)
if len(records) == 0 {
return // π Worker exits naturally
}
// Process batch and increment offset
}
// Parser: Process until channel closes
for record := range rawChan {
process(record)
} // π Worker exits when channel closes
nvd-fetcher (Long-Running):
// 1000 workers always listening
for i := 0; i < 1000; i++ {
go func() {
for cveTask := range taskChan { // π Never exits
process(cveTask) // 1 CVE per worker
}
}()
}
// Workers run forever, waiting for next CVE
graph LR
A[SQLite DB] --> B[Reader Workers]
B --> C[Raw Channel]
C --> D[Parser Workers]
D --> E[Parsed Channel]
E --> F[Indexer Workers]
F --> G[Elasticsearch]
H[CVE Index] --> D
I[Stats Collector] --> J[Metrics File]
K[Error Logger] --> L[Error File]
π‘οΈ CVE Enrichment
Automatic CVE Processing
- Detection: Identifies
VULNERABILITY_CVE_*scan types - Enrichment Source: Queries
go-list-cve-*Elasticsearch indices - Caching: In-memory LRU cache for performance optimization
CISA KEV Integration
- CISA Data: Known Exploited Vulnerabilities catalog
- Fields Added:
{ "hasCisa": true, "cisa": { "cisaActionDue": "2022-04-15", "cisaExploitAdd": "2022-03-25", "cisaRequiredAction": "Apply updates per vendor instructions.", "cisaVulnerabilityName": "HP OpenView Network Node Manager RCE" } }
EPSS Integration
- EPSS Data: Exploit Prediction Scoring System from
epss-scoresindex - Fields Added:
{ "hasEpss": true, "epss": { "cve": "CVE-2021-44228", "epss": "0.973730000", "percentile": "0.999940000", "date": "2024-10-01", "timestamp": "2024-10-02T10:00:59.442658735+07:00" } }
Scoring Logic
- Prefer CVSS v3.1 over v2.0 when available
- Fallback hierarchy: v3 β v2 β base score
- Severity mapping: Numeric score to categorical severity
- EPSS enrichment: Adds exploit prediction probability
π Performance Monitoring
Real-time Statistics
{
"timestamp": "2025-09-23T10:30:00Z",
"reader": {
"records_per_second": 150.5,
"avg_processing_time_ms": 45.2,
"active_workers": 2,
"total_processed": 3010,
"error_count": 0
},
"parser": {
"records_per_second": 89.3,
"avg_processing_time_ms": 112.7,
"active_workers": 4,
"queue_depth": 234,
"error_count": 2
},
"indexer": {
"records_per_second": 92.1,
"avg_processing_time_ms": 67.8,
"active_workers": 3,
"error_count": 1
},
"uptime": "2h34m12s"
}
Performance Tuning Guide
- Reader Bottleneck: Increase database connections or optimize SQL
- Parser Bottleneck: Add parser workers or optimize CVE enrichment
- Indexer Bottleneck: Tune Elasticsearch bulk settings or add workers
- Memory Usage: Monitor queue depths and adjust batch sizes
π Error Handling
Structured Error Logging
{
"timestamp": "2025-09-23T10:30:00Z",
"stage": "parser",
"record_id": "abc123hash",
"scan_name": "security_audit_sektor_finance_organisasi_bank_xyz_target_10.0.1.100",
"error": "CVE enrichment timeout: connection to elasticsearch failed"
}
Error Recovery Strategies
- Transient Failures: Automatic retry with exponential backoff
- Data Validation: Skip malformed records with detailed logging
- Resource Constraints: Circuit breaker pattern for external services
- Graceful Degradation: Continue processing without enrichment when CVE service unavailable
π§ͺ Testing
Running Tests
# Run all tests
go test ./...
# Run tests with coverage
go test -cover ./...
# Run tests with detailed coverage report
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
# Run benchmarks
go test -bench=. ./...
# Run race condition detection
go test -race ./...
Test Categories
- Unit Tests: Individual component testing
- Integration Tests: Pipeline stage integration
- Performance Tests: Benchmark critical paths
- Mock Tests: External service simulation
Test Coverage Goals
- Overall Coverage: >85%
- Critical Path Coverage: >95%
- Error Path Coverage: >80%
π€ Contributing
Development Setup
git clone https://github.com/luhtaf/spiderfoot-fetcher.git
cd spiderfoot-fetcher
go mod tidy
make setup
Code Standards
- Formatting:
gofmtandgoimports - Linting:
golangci-lint run - Testing: Minimum 85% coverage
- Documentation: Godoc for all public APIs
Pull Request Process
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Run tests (
make test) - Commit changes (
git commit -m 'Add amazing feature') - Push branch (
git push origin feature/amazing-feature) - Open Pull Request
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π Acknowledgments
- SpiderFoot - Open source intelligence automation
- Elasticsearch - Search and analytics engine
- CISA KEV - Known Exploited Vulnerabilities catalog
π Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Security: security@luhtaf.dev
Documentation
ΒΆ
There is no documentation for this package.