collect

command module
v0.0.0-...-9b9701b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

README

Collect - System Inventory Collection Service

High-performance inventory collection service that handles thousands of systems reporting their inventory data every minute with real-time change detection and notifications.

Quick Start

Prerequisites
  • Go 1.24+
  • PostgreSQL 15+
  • Redis 7+
  • Docker/Podman
Setup

Note: Collect shares the same PostgreSQL and Redis containers with the backend. If you already started them with cd backend && make dev-up, you can skip make dev-up here.

# Setup development environment
make dev-setup

# Start PostgreSQL and Redis containers (skip if already running from backend)
make dev-up

# Start the application (port 8081)
make run

# Stop PostgreSQL and Redis when done
make dev-down
Required Environment Variables
# Postgres URL
DATABASE_URL=postgresql://noc_user:noc_password@localhost:5432/noc?sslmode=disable

# Redis Configuration
REDIS_URL=redis://localhost:6379
REDIS_DB=1
REDIS_PASSWORD=
# Note: REDIS_PASSWORD can be empty for Redis without authentication
Optional Environment Variables
LISTEN_ADDRESS=127.0.0.1:8081
API_MAX_REQUEST_SIZE=10MB
HEARTBEAT_TIMEOUT_MINUTES=10
LOG_LEVEL=info

Architecture

Inventory Processing Flow

1. Data Collection

  • Systems POST inventory data to /api/systems/inventory
  • HTTP Basic Auth (system_id:system_secret)
  • Data queued in collect:inventory

2. Inventory Processing

  • Inventory Worker processes batches, stores in inventory_records
  • SHA-256 deduplication prevents duplicate processing
  • Triggers diff computation for systems with previous records

3. Change Detection

  • Diff Worker computes JSON diffs between current and previous inventory
  • Categorizes changes (OS, hardware, network, features)
  • Stores results in inventory_diffs with severity levels

4. Notifications

  • Notification Worker processes significant changes
  • Generates alerts based on configurable rules
  • Stores notifications in inventory_alerts

5. Retry & Cleanup

  • Delayed Message Worker handles failed jobs with exponential backoff
  • Cleanup Worker applies exponential retention to inventory_records (see below); inventory_diffs are never deleted
  • Queue Monitor Worker tracks system health and performance

6. Heartbeat Monitoring

  • Heartbeat Monitor Cron runs every 60 seconds
  • Automatically updates system status based on heartbeat freshness:
    • unknownactive when first heartbeat arrives
    • inactiveactive when fresh heartbeat arrives (< 10 minutes)
    • activeinactive when heartbeat is stale (> 10 minutes)
  • Configurable timeout via HEARTBEAT_TIMEOUT_MINUTES (default: 10 minutes)

7. LinkFailed Synchronization

  • LinkFailed Monitor Cron runs every 5 minutes
  • Fires the internal LinkFailed alert for inactive, non-deleted systems after HEARTBEAT_TIMEOUT_MINUTES (10 minutes by default)
  • Stops refreshing the alert when a system is active again, so Alertmanager resolves it after the 10 minute TTL from the last refresh
  • This can keep the alert visible for up to 10 minutes after heartbeat recovery
  • Reuses the same server-side label enrichment as the Mimir proxy so internal alerts carry the same authoritative system and organization labels
Queue Architecture
  • collect:inventory → Raw inventory data
  • collect:processing → Diff computation jobs
  • collect:notifications → Alert notifications
  • {queue}:delayed → Failed jobs with retry delays
  • {queue}:dead → Jobs that exceeded max retry attempts
Data Retention Policy

inventory_diffs are never deleted. They are the source of truth for the timeline feature (/inventory/timeline) and are self-contained (each diff stores field_path, previous_value, current_value).

inventory_records (full JSON snapshots) use exponential retention — more frequent near the present, progressively sparser further back:

Age Retention
Last 7 days All records preserved
7 days – 1 month 1 per day
1 month – 3 months 1 per week
3 months – 1 year 1 per month
Older than 1 year 1 per quarter

The first record per system (inventory baseline) and the latest record (current state, used by /inventory/latest) are always preserved regardless of age.

Development

Basic Commands
# Run tests
make test

# Format code
make fmt

# Run linter
make lint

# Build
make build

# Run server
make run

# Run QA server (uses .env.qa)
make run-qa

# Test coverage
make test-coverage
PostgreSQL Commands
# Start PostgreSQL container
make db-up

# Stop PostgreSQL container
make db-down

# Reset database
make db-reset

# Start full environment
make dev-up

# Stop full environment
make dev-down
Redis Commands
# Start Redis container (Docker/Podman auto-detected)
make redis-up

# Stop Redis container
make redis-down

# Flush Redis cache
make redis-flush

# Connect to Redis CLI
make redis-cli

# Queue monitoring
redis-cli llen collect:inventory          # Pending inventory jobs
redis-cli llen collect:processing         # Pending diff jobs
redis-cli llen collect:notifications      # Pending notifications
redis-cli zcard collect:inventory:delayed # Delayed jobs (sorted set)
redis-cli llen collect:inventory:dead     # Dead letter queue

# View queue contents
redis-cli lrange collect:inventory 0 9
redis-cli zrange collect:inventory:delayed 0 9 WITHSCORES

# Clear queues (development only)
redis-cli del collect:inventory collect:processing collect:notifications
Testing
# Submit heartbeat (requires system credentials from backend API)
# Authentication via HTTP Basic Auth: system_key:system_secret
curl -X POST http://localhost:8081/api/systems/heartbeat \
  -u "NOC-XXXX-XXXX-XXXX:your_system_secret"

# Submit inventory (requires system credentials from backend API)
curl -X POST http://localhost:8081/api/systems/inventory \
  -H "Content-Type: application/json" \
  -u "system_key:system_secret" \
  -d '{"system_id": "test", "timestamp": "2025-07-13T10:00:00Z", "data": {"os": {"name": "TestOS"}}}'

# Upload a configuration backup (streams the file body; GPG encryption is
# up to the appliance). X-Filename is the user-facing name shown in the UI.
curl -X POST http://localhost:8081/api/systems/backups \
  -u "system_key:system_secret" \
  -H "Content-Type: application/octet-stream" \
  -H "X-Filename: dump.json.gz.gpg" \
  --data-binary @/path/to/backup.gpg
Appliance integration (NS8 / NethSecurity)

Each authenticated system owns a prefix in the backup bucket ({org_id}/{system_key}/...) and the three endpoints below are all the appliance needs. Auth is always HTTP Basic with the system_key and the system_secret returned at registration:

Endpoint Purpose
POST /api/systems/backups Stream a new backup. 201 on success with {id, sha256, size, uploaded_at} in the body.
GET /api/systems/backups List own backups with metadata.
GET /api/systems/backups/:id Download a stored backup for restore.

Retention is enforced server-side: after each upload the oldest entries beyond BACKUP_MAX_PER_SYSTEM or BACKUP_MAX_SIZE_PER_SYSTEM are pruned. Client-side dedup via local MD5 is still useful — if the hash of the just-encrypted blob equals the one from the previous run the appliance can skip the upload entirely.

Backup storage

Backup objects are stored on any S3-compatible bucket. In production the project targets DigitalOcean Spaces (the same account that backs Mimir); for local development point the BACKUP_S3_* env vars at your preferred emulator (MinIO, Garage, LocalStack, …) or at a dedicated dev bucket on your S3 provider of choice. Backend and collect must share the same bucket — collect writes, backend reads and issues presigned download URLs.

Separate credentials (optional). Nothing in the code forces backend and collect to share the same S3_ACCESS_KEY / S3_SECRET_KEY. If your S3 provider supports per-key policies (AWS S3 IAM, Cloudflare R2 scoped tokens, MinIO user policies), issue two keys: one for collect scoped to PutObject/CopyObject/DeleteObject/ ListObjectsV2/HeadObject on the bucket prefix, and one for backend scoped to GetObject/ListObjectsV2/HeadObject/ DeleteObject. Then set a different S3_ACCESS_KEY / S3_SECRET_KEY pair on each service. This contains blast radius if one component is ever compromised.

Object keys follow {org_id}/{system_key}/{backup_id}.{ext} — the user-facing system_key (NETH-…) is preferred over the internal UUID so operators browsing a raw bucket listing can recognise each system at a glance. Per-object metadata is stored as standard x-amz-meta-* headers:

Header Source
x-amz-meta-sha256 Computed by collect via streaming tee at ingest
x-amz-meta-filename From the appliance request header X-Filename (user-facing name)
x-amz-meta-uploader-ip Client IP observed by collect
Testing the backup round-trip end-to-end

Any S3-compatible bucket works. Provision one on your preferred provider (DigitalOcean Spaces, Cloudflare R2, AWS S3, a self-hosted MinIO/Garage, …), issue an access key pair scoped to that bucket, and set the values in backend/.env and collect/.env:

S3_ENDPOINT=...        # e.g. https://ams3.digitaloceanspaces.com
BACKUP_S3_REGION=ams3         # DO Spaces region code; any non-empty value works for MinIO/Garage
BACKUP_S3_BUCKET=my-backups-dev
S3_ACCESS_KEY=...
S3_SECRET_KEY=...
BACKUP_S3_USE_PATH_STYLE=false   # set to true only for MinIO/Garage

Restart backend and collect, then simulate an appliance upload — the GPG pipeline below matches exactly what NS8's send-cluster-backup and NethSecurity's send-backup produce today:

# Prepare a fake GPG-encrypted blob
echo '{"simulation": true}' | gzip > /tmp/dump.json.gz
gpg --batch --yes -c --pinentry-mode loopback --cipher-algo AES256 \
    --passphrase-fd 0 /tmp/dump.json.gz <<< "local-dev-passphrase"

# Upload as an authenticated appliance
curl --user "$SYSTEM_KEY:$SYSTEM_SECRET" \
     -H "X-Filename: dump.json.gz.gpg" \
     -H "Content-Type: application/octet-stream" \
     --data-binary @/tmp/dump.json.gz.gpg \
     http://localhost:8081/api/systems/backups

Verify the object shows up under {org_id}/{system_key}/<uuidv7>.gpg in the bucket (via the provider's console or the AWS CLI).

Project Structure

collect/
├── main.go                 # Application entry point
├── alerting/              # Shared Alertmanager helpers
├── configuration/          # Environment configuration
├── cron/                  # Scheduled jobs
│   ├── heartbeat_monitor.go       # System status monitoring
│   └── linkfailed_monitor.go      # LinkFailed alert synchronization
├── database/              # PostgreSQL connection and models
├── methods/               # HTTP request handlers
├── middleware/            # Authentication middleware
├── models/                # Data structures
├── queue/                 # Redis queue management
├── storage/               # S3 client for backup ingest
├── workers/               # Background processing workers
│   ├── inventory_worker.go        # Batch inventory processing
│   ├── diff_worker.go             # Change detection
│   ├── notification_worker.go     # Alert notifications
│   ├── cleanup_worker.go          # Data maintenance
│   ├── queue_monitor_worker.go    # Queue health monitoring
│   ├── delayed_message_worker.go  # Retry handling
│   └── manager.go                 # Worker orchestration
└── differ/                # JSON diff computation engine

Machine-scoped Alertmanager access

The Mimir Alertmanager endpoints (/services/mimir/alertmanager/api/v2/*) implement strict per-machine scoping to ensure systems can only access their own alerts and silences.

How scoping works

Each system (identified by system_key) is automatically restricted to see and manage only its own data:

  • GET /alerts: The proxy injects a system_key filter into the query, limiting results to this system's alerts
  • POST /alerts: The system_key, system_id, and organization_id labels are injected and override any client values
  • GET /silences: The proxy injects a system_key filter to scope results to this system's silences
  • POST /silences: The proxy injects a system_key matcher, overwriting any client-supplied system_key matcher. This ensures the silence can only target this system's alerts
  • GET /silences/{id}: The proxy fetches the silence from Mimir and verifies it contains an exact system_key matcher matching this system. Denies access (403) if the silence does not belong to this system
  • DELETE /silences/{id}: Same ownership verification as GET, then deletes only if verified
Security properties
  • Systems cannot view, create, or modify silences for other systems
  • Silence matchers cannot be bypassed — a system_key matcher is always enforced server-side
  • The system_key label in alerts is always server-sourced (injected via injectLabels in mimir.go), never trusted from client input
  • Failed ownership checks are logged with system and path details for audit purposes

Alert annotation templating

Alert annotations support Go text/template syntax. When alerts are posted to /api/services/mimir/alertmanager/api/v2/alerts, the proxy processes any template expressions in annotation values, substituting alert labels.

Template syntax

Use standard Go template syntax with alert labels as data:

{
  "labels": {
    "severity": "critical",
    "alertname": "DiskFull",
    "system_key": "SYS-001"
  },
  "annotations": {
    "summary": "Alert {{.alertname}} has severity {{.severity}}",
    "description": "System: {{.system_key}}"
  }
}

Result after templating:

{
  "annotations": {
    "summary": "Alert DiskFull has severity critical",
    "description": "System: SYS-001"
  }
}
Behavior
  • Only annotations containing {{ are processed
  • Labels are passed as template data (accessible via .fieldname)
  • Non-existent labels render as <no value>
  • Invalid template syntax is logged as a warning; the annotation remains unchanged
  • Non-string annotation values are preserved as-is
  • Static annotations (without template syntax) pass through unchanged

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
pkg
Package storage provides an S3 client for the DigitalOcean Spaces bucket that holds appliance configuration backups.
Package storage provides an S3 client for the DigitalOcean Spaces bucket that holds appliance configuration backups.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL