postgres

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package postgres owns the durability boundary for mqtt2db-go.

MQTT messages are acknowledged only after they land in PostgreSQL via this package. We use pgx directly (never database/sql) so we can lean on pgx.CopyFrom for batch inserts; this is the only reliable way to push 10K+ rows/second through PgBouncer without choking on per-statement round trips.

Idempotency is handled by writing each batch into a temp table (created once per session) and then INSERT ... SELECT ... ON CONFLICT DO NOTHING into the destination. CopyFrom does not honor ON CONFLICT directly, so the staging hop is unavoidable.

Index

Constants

View Source
const MaxErrorDetailLen = 256

MaxErrorDetailLen caps the size of ErrorDetail at the persistence layer. The parser may put the offending tenant or UUID segment into Detail; topics are already bounded to MaxTopicLen but this is an extra guard so a misbehaving payload cannot bloat the side-table.

View Source
const UnparseableTableName = "telemetry_unparseable"

UnparseableTableName is the side-table created by migration 0002. Hard-coded (not config-driven) because the subscriber writes a fixed row shape; an operator who renames it must also update the constant.

Variables

This section is empty.

Functions

func Migrate

func Migrate(sourceURL, dsn string) error

Migrate applies all up migrations from sourceURL against dsn. Idempotent when the database is already at head. ErrNoChange is treated as success because operators expect "migrate up" to be a fence, not a guarantee that something happened.

CLAUDE.md is explicit that the ingest service does not run migrations on startup; this is a tool for the cmd/migrate binary and integration tests.

func MigrateDown

func MigrateDown(sourceURL, dsn string, steps int) error

MigrateDown rolls back N steps; pass 0 to roll back everything. Used by integration tests and operational rollback only.

func NewPool

func NewPool(ctx context.Context, cfg config.PostgresConfig) (*pgxpool.Pool, error)

NewPool constructs a *pgxpool.Pool from PostgresConfig. The DSN is parsed once and pool sizing comes from config; everything else is left at pgx defaults so operators tune behavior via the DSN itself.

The caller owns the pool's lifetime and must call Close on shutdown.

func Version

func Version(sourceURL, dsn string) (uint, bool, error)

Version returns the current schema version and a "dirty" flag indicating that a previous migration crashed mid-run. Returns (0, false, nil) for a fresh database.

Types

type Copier

type Copier struct {
	// contains filtered or unexported fields
}

Copier persists Messages via pgx.CopyFrom plus an ON CONFLICT staging hop, dedup'd on dedup_key.

func NewCopier

func NewCopier(pool *pgxpool.Pool, cfg config.PostgresConfig) (*Copier, error)

NewCopier wires a Copier against an existing pool. It validates that the configured columns match the Message field order Postgres expects.

func (*Copier) CopyMessages

func (c *Copier) CopyMessages(ctx context.Context, msgs []Message) (int64, error)

CopyMessages writes msgs to telemetry, deduplicating on dedup_key. The returned count is the number of rows actually inserted (excluding conflicts). The implementation uses a transaction-scoped temp table so PgBouncer transaction-pooling does not orphan it between calls.

type Inserter

type Inserter interface {
	CopyMessages(ctx context.Context, msgs []Message) (int64, error)
}

Inserter is the narrow surface the flusher uses to write batches. It exists so callers can mock CopyMessages in tests without spinning up a real pool.

type Message

type Message struct {
	TenantID   string
	DeviceUUID uuid.UUID
	Topic      string
	Payload    []byte
	ReceivedAt time.Time
	DedupKey   string
}

Message is the row shape persisted to the telemetry table. The columns listed in PostgresConfig.Columns must match the order of fields written by Message.copyRow (see CopyMessages).

type Unparseable added in v0.1.2

type Unparseable struct {
	Topic       string
	Payload     []byte
	ErrorClass  string
	ErrorDetail string // empty -> persisted as NULL
	ReceivedAt  time.Time
}

Unparseable is the row shape persisted to telemetry_unparseable when the subscriber cannot extract (tenant_id, device_uuid) from an MQTT topic. See docs/adr/0006-unparseable-side-table.md for context.

type UnparseableInserter added in v0.1.2

type UnparseableInserter interface {
	InsertUnparseable(ctx context.Context, row Unparseable) error
}

UnparseableInserter is the narrow surface the subscriber uses to persist parse failures. Defined as an interface so tests can mock it without spinning up a real pool.

type UnparseableWriter added in v0.1.2

type UnparseableWriter struct {
	// contains filtered or unexported fields
}

UnparseableWriter writes single rows into telemetry_unparseable. Inserts are not batched: parse failures are by design rare, so the extra round trip per failure is fine and keeps this path off the hot flusher loop.

func NewUnparseableWriter added in v0.1.2

func NewUnparseableWriter(pool *pgxpool.Pool, cfg config.PostgresConfig) (*UnparseableWriter, error)

NewUnparseableWriter wires a writer against an existing pool. It uses the same schema as the main telemetry table.

func (*UnparseableWriter) InsertUnparseable added in v0.1.2

func (w *UnparseableWriter) InsertUnparseable(ctx context.Context, row Unparseable) error

InsertUnparseable persists a single parse-failure row. ErrorDetail is truncated to MaxErrorDetailLen and stored as NULL when empty. Returns an error on any DB failure; callers should log + increment a failure metric and ack the MQTT message anyway to avoid poisoning the queue (see ADR 0006).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL