Documentation
¶
Overview ¶
Package postgres owns the durability boundary for mqtt2db-go.
MQTT messages are acknowledged only after they land in PostgreSQL via this package. We use pgx directly (never database/sql) so we can lean on pgx.CopyFrom for batch inserts; this is the only reliable way to push 10K+ rows/second through PgBouncer without choking on per-statement round trips.
Idempotency is handled by writing each batch into a temp table (created once per session) and then INSERT ... SELECT ... ON CONFLICT DO NOTHING into the destination. CopyFrom does not honor ON CONFLICT directly, so the staging hop is unavoidable.
Index ¶
- Constants
- func Migrate(sourceURL, dsn string) error
- func MigrateDown(sourceURL, dsn string, steps int) error
- func NewPool(ctx context.Context, cfg config.PostgresConfig) (*pgxpool.Pool, error)
- func Version(sourceURL, dsn string) (uint, bool, error)
- type Copier
- type Inserter
- type Message
- type Unparseable
- type UnparseableInserter
- type UnparseableWriter
Constants ¶
const MaxErrorDetailLen = 256
MaxErrorDetailLen caps the size of ErrorDetail at the persistence layer. The parser may put the offending tenant or UUID segment into Detail; topics are already bounded to MaxTopicLen but this is an extra guard so a misbehaving payload cannot bloat the side-table.
const UnparseableTableName = "telemetry_unparseable"
UnparseableTableName is the side-table created by migration 0002. Hard-coded (not config-driven) because the subscriber writes a fixed row shape; an operator who renames it must also update the constant.
Variables ¶
This section is empty.
Functions ¶
func Migrate ¶
Migrate applies all up migrations from sourceURL against dsn. Idempotent when the database is already at head. ErrNoChange is treated as success because operators expect "migrate up" to be a fence, not a guarantee that something happened.
CLAUDE.md is explicit that the ingest service does not run migrations on startup; this is a tool for the cmd/migrate binary and integration tests.
func MigrateDown ¶
MigrateDown rolls back N steps; pass 0 to roll back everything. Used by integration tests and operational rollback only.
Types ¶
type Copier ¶
type Copier struct {
// contains filtered or unexported fields
}
Copier persists Messages via pgx.CopyFrom plus an ON CONFLICT staging hop, dedup'd on dedup_key.
func NewCopier ¶
NewCopier wires a Copier against an existing pool. It validates that the configured columns match the Message field order Postgres expects.
func (*Copier) CopyMessages ¶
CopyMessages writes msgs to telemetry, deduplicating on dedup_key. The returned count is the number of rows actually inserted (excluding conflicts). The implementation uses a transaction-scoped temp table so PgBouncer transaction-pooling does not orphan it between calls.
type Inserter ¶
Inserter is the narrow surface the flusher uses to write batches. It exists so callers can mock CopyMessages in tests without spinning up a real pool.
type Message ¶
type Message struct {
TenantID string
DeviceUUID uuid.UUID
Topic string
Payload []byte
ReceivedAt time.Time
DedupKey string
}
Message is the row shape persisted to the telemetry table. The columns listed in PostgresConfig.Columns must match the order of fields written by Message.copyRow (see CopyMessages).
type Unparseable ¶ added in v0.1.2
type Unparseable struct {
Topic string
Payload []byte
ErrorClass string
ErrorDetail string // empty -> persisted as NULL
ReceivedAt time.Time
}
Unparseable is the row shape persisted to telemetry_unparseable when the subscriber cannot extract (tenant_id, device_uuid) from an MQTT topic. See docs/adr/0006-unparseable-side-table.md for context.
type UnparseableInserter ¶ added in v0.1.2
type UnparseableInserter interface {
InsertUnparseable(ctx context.Context, row Unparseable) error
}
UnparseableInserter is the narrow surface the subscriber uses to persist parse failures. Defined as an interface so tests can mock it without spinning up a real pool.
type UnparseableWriter ¶ added in v0.1.2
type UnparseableWriter struct {
// contains filtered or unexported fields
}
UnparseableWriter writes single rows into telemetry_unparseable. Inserts are not batched: parse failures are by design rare, so the extra round trip per failure is fine and keeps this path off the hot flusher loop.
func NewUnparseableWriter ¶ added in v0.1.2
func NewUnparseableWriter(pool *pgxpool.Pool, cfg config.PostgresConfig) (*UnparseableWriter, error)
NewUnparseableWriter wires a writer against an existing pool. It uses the same schema as the main telemetry table.
func (*UnparseableWriter) InsertUnparseable ¶ added in v0.1.2
func (w *UnparseableWriter) InsertUnparseable(ctx context.Context, row Unparseable) error
InsertUnparseable persists a single parse-failure row. ErrorDetail is truncated to MaxErrorDetailLen and stored as NULL when empty. Returns an error on any DB failure; callers should log + increment a failure metric and ack the MQTT message anyway to avoid poisoning the queue (see ADR 0006).