postgres

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const LocalSyncTimestampLayout = "2006-01-02T15:04:05.000Z"

LocalSyncTimestampLayout uses millisecond precision to match SQLite's datetime resolution.

View Source
const MaxHeatmapDays = 366

MaxHeatmapDays is the maximum number of day entries.

Variables

This section is empty.

Functions

func CheckSSL

func CheckSSL(dsn string) error

CheckSSL returns an error when the PG connection string targets a non-loopback host without TLS encryption. It uses the pgx driver's own DSN parser to resolve the effective host and TLS configuration, avoiding bypasses from exotic DSN formats.

A connection is rejected when any path in the TLS negotiation chain (primary + fallbacks) permits plaintext for a non-loopback host. This rejects sslmode=disable, allow, and prefer.

func CheckSchemaCompat

func CheckSchemaCompat(
	ctx context.Context, db *sql.DB,
) error

CheckSchemaCompat verifies that the PG schema has all columns required by query paths. This is a read-only probe that works against any PG role. Returns nil if compatible, or an error describing what is missing.

func EnsureSchema

func EnsureSchema(
	ctx context.Context, db *sql.DB, schema string,
) error

EnsureSchema creates the schema (if needed), then runs idempotent CREATE TABLE / ALTER TABLE statements. The schema parameter is the unquoted schema name (e.g. "agentsview").

After CREATE SCHEMA, all table DDL uses unqualified names because Open() sets search_path to the target schema.

func FormatISO8601

func FormatISO8601(t time.Time) string

FormatISO8601 formats a time.Time to ISO-8601 UTC string for JSON API responses.

func FormatSyncTimestamp

func FormatSyncTimestamp(t time.Time) string

FormatSyncTimestamp formats a time as a microsecond-precision UTC ISO-8601 string for PG sync watermarks.

func IsReadOnlyError

func IsReadOnlyError(err error) bool

IsReadOnlyError returns true when the error indicates a PG read-only or insufficient-privilege condition (SQLSTATE 25006 or 42501). Uses pgconn.PgError for reliable SQLSTATE matching.

func NormalizeLocalSyncStateTimestamps

func NormalizeLocalSyncStateTimestamps(
	local SyncStateStore,
) error

NormalizeLocalSyncStateTimestamps normalizes the last_push_at watermark in the local SQLite sync state to millisecond precision.

func NormalizeLocalSyncTimestamp

func NormalizeLocalSyncTimestamp(
	value string,
) (string, error)

NormalizeLocalSyncTimestamp parses a RFC3339Nano timestamp and re-formats it to millisecond precision for SQLite sync state.

func NormalizeSyncTimestamp

func NormalizeSyncTimestamp(value string) (string, error)

NormalizeSyncTimestamp parses a RFC3339Nano timestamp and re-formats it to microsecond precision for PG sync.

func Open

func Open(
	dsn, schema string, allowInsecure bool,
) (*sql.DB, error)

Open opens a PG connection pool, validates SSL, and sets search_path to the given schema on every connection.

The schema name is validated and quoted to prevent injection. When allowInsecure is true, non-loopback connections without TLS produce a warning instead of failing.

func ParseSQLiteTimestamp

func ParseSQLiteTimestamp(s string) (time.Time, bool)

ParseSQLiteTimestamp parses an ISO-8601 text timestamp from SQLite into a time.Time. Returns zero time and false for empty strings or unparseable values.

func PreviousLocalSyncTimestamp

func PreviousLocalSyncTimestamp(
	value string,
) (string, error)

PreviousLocalSyncTimestamp returns the timestamp 1ms before the given value, formatted at millisecond precision. This creates a non-overlapping boundary for incremental sync queries against SQLite.

func RedactDSN

func RedactDSN(dsn string) string

RedactDSN returns the host portion of the DSN for diagnostics, stripping credentials, query parameters, and path components that may contain secrets.

func WarnInsecureSSL

func WarnInsecureSSL(dsn string)

WarnInsecureSSL logs a warning when the PG connection string targets a non-loopback host without TLS encryption. Uses the pgx driver's DSN parser for accurate host/TLS resolution.

Types

type PushResult

type PushResult struct {
	SessionsPushed int
	MessagesPushed int
	Errors         int
	Duration       time.Duration
}

PushResult summarizes a push sync operation.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store wraps a PostgreSQL connection for read-only session queries.

func NewStore

func NewStore(
	pgURL, schema string, allowInsecure bool,
) (*Store, error)

NewStore opens a PostgreSQL connection using the shared Open() helper and returns a read-only Store. When allowInsecure is true, non-loopback connections without TLS produce a warning instead of failing.

func (*Store) BulkStarSessions

func (s *Store) BulkStarSessions(_ []string) error

BulkStarSessions is not supported in read-only mode.

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database connection.

func (*Store) DB

func (s *Store) DB() *sql.DB

DB returns the underlying *sql.DB for operations that need direct access (e.g. schema compatibility checks).

func (*Store) DecodeCursor

func (s *Store) DecodeCursor(
	raw string,
) (db.SessionCursor, error)

DecodeCursor parses a base64-encoded cursor string.

func (*Store) DeleteInsight

func (s *Store) DeleteInsight(_ int64) error

DeleteInsight is not supported in read-only mode.

func (*Store) DeleteSessionIfTrashed

func (s *Store) DeleteSessionIfTrashed(
	_ string,
) (int64, error)

DeleteSessionIfTrashed is not supported in read-only mode.

func (*Store) EmptyTrash

func (s *Store) EmptyTrash() (int, error)

EmptyTrash is not supported in read-only mode.

func (*Store) EncodeCursor

func (s *Store) EncodeCursor(
	endedAt, id string, total ...int,
) string

EncodeCursor returns a base64-encoded, HMAC-signed cursor.

func (*Store) GetAgents

func (s *Store) GetAgents(
	ctx context.Context,
	excludeOneShot, excludeAutomated bool,
) ([]db.AgentInfo, error)

GetAgents returns distinct agent names with session counts.

func (*Store) GetAllMessages

func (s *Store) GetAllMessages(
	ctx context.Context, sessionID string,
) ([]db.Message, error)

GetAllMessages returns all messages for a session ordered by ordinal.

func (*Store) GetAnalyticsActivity

func (s *Store) GetAnalyticsActivity(
	ctx context.Context, f db.AnalyticsFilter,
	granularity string,
) (db.ActivityResponse, error)

GetAnalyticsActivity returns session/message counts grouped by time bucket.

func (*Store) GetAnalyticsHeatmap

func (s *Store) GetAnalyticsHeatmap(
	ctx context.Context, f db.AnalyticsFilter,
	metric string,
) (db.HeatmapResponse, error)

GetAnalyticsHeatmap returns daily counts with intensity levels.

func (*Store) GetAnalyticsHourOfWeek

func (s *Store) GetAnalyticsHourOfWeek(
	ctx context.Context, f db.AnalyticsFilter,
) (db.HourOfWeekResponse, error)

GetAnalyticsHourOfWeek returns message counts bucketed by day-of-week and hour-of-day.

func (*Store) GetAnalyticsProjects

func (s *Store) GetAnalyticsProjects(
	ctx context.Context, f db.AnalyticsFilter,
) (db.ProjectsAnalyticsResponse, error)

GetAnalyticsProjects returns per-project analytics.

func (*Store) GetAnalyticsSessionShape

func (s *Store) GetAnalyticsSessionShape(
	ctx context.Context, f db.AnalyticsFilter,
) (db.SessionShapeResponse, error)

GetAnalyticsSessionShape returns distribution histograms for session length, duration, and autonomy ratio.

func (*Store) GetAnalyticsSummary

func (s *Store) GetAnalyticsSummary(
	ctx context.Context, f db.AnalyticsFilter,
) (db.AnalyticsSummary, error)

GetAnalyticsSummary returns aggregate statistics.

func (*Store) GetAnalyticsTools

func (s *Store) GetAnalyticsTools(
	ctx context.Context, f db.AnalyticsFilter,
) (db.ToolsAnalyticsResponse, error)

GetAnalyticsTools returns tool usage analytics.

func (*Store) GetAnalyticsTopSessions

func (s *Store) GetAnalyticsTopSessions(
	ctx context.Context, f db.AnalyticsFilter,
	metric string,
) (db.TopSessionsResponse, error)

GetAnalyticsTopSessions returns the top 10 sessions by the given metric.

func (*Store) GetAnalyticsVelocity

func (s *Store) GetAnalyticsVelocity(
	ctx context.Context, f db.AnalyticsFilter,
) (db.VelocityResponse, error)

GetAnalyticsVelocity computes turn cycle, first response, and throughput metrics.

func (*Store) GetChildSessions

func (s *Store) GetChildSessions(
	ctx context.Context, parentID string,
) ([]db.Session, error)

GetChildSessions returns sessions whose parent_session_id matches the given parentID.

func (*Store) GetInsight

func (s *Store) GetInsight(
	_ context.Context, _ int64,
) (*db.Insight, error)

GetInsight returns nil.

func (*Store) GetMachines

func (s *Store) GetMachines(
	ctx context.Context,
	excludeOneShot, excludeAutomated bool,
) ([]string, error)

GetMachines returns distinct machine names.

func (*Store) GetMessages

func (s *Store) GetMessages(
	ctx context.Context,
	sessionID string, from, limit int, asc bool,
) ([]db.Message, error)

GetMessages returns paginated messages for a session.

func (*Store) GetProjects

func (s *Store) GetProjects(
	ctx context.Context,
	excludeOneShot, excludeAutomated bool,
) ([]db.ProjectInfo, error)

GetProjects returns project names with session counts.

func (*Store) GetSession

func (s *Store) GetSession(
	ctx context.Context, id string,
) (*db.Session, error)

GetSession returns a single session by ID, excluding soft-deleted sessions.

func (*Store) GetSessionActivity added in v0.17.0

func (s *Store) GetSessionActivity(
	ctx context.Context, sessionID string,
) (*db.SessionActivityResponse, error)

GetSessionActivity returns time-bucketed message counts for a session, using PostgreSQL-specific timestamp functions.

func (*Store) GetSessionFull

func (s *Store) GetSessionFull(
	ctx context.Context, id string,
) (*db.Session, error)

GetSessionFull returns a single session by ID including soft-deleted sessions.

func (*Store) GetSessionVersion

func (s *Store) GetSessionVersion(
	id string,
) (int, int64, bool)

GetSessionVersion returns the message count and a hash of updated_at for SSE change detection.

func (*Store) GetStats

func (s *Store) GetStats(
	ctx context.Context,
	excludeOneShot, excludeAutomated bool,
) (db.Stats, error)

GetStats returns database statistics, counting only root sessions with messages.

func (*Store) HasFTS

func (s *Store) HasFTS() bool

HasFTS returns true because ILIKE search is available.

func (*Store) InsertInsight

func (s *Store) InsertInsight(
	_ db.Insight,
) (int64, error)

InsertInsight is not supported in read-only mode.

func (*Store) ListInsights

func (s *Store) ListInsights(
	_ context.Context, _ db.InsightFilter,
) ([]db.Insight, error)

ListInsights returns an empty slice.

func (*Store) ListPinnedMessages

func (s *Store) ListPinnedMessages(
	_ context.Context, _ string,
) ([]db.PinnedMessage, error)

ListPinnedMessages returns an empty slice.

func (*Store) ListSessions

func (s *Store) ListSessions(
	ctx context.Context, f db.SessionFilter,
) (db.SessionPage, error)

ListSessions returns a cursor-paginated list of sessions.

func (*Store) ListStarredSessionIDs

func (s *Store) ListStarredSessionIDs(
	_ context.Context,
) ([]string, error)

ListStarredSessionIDs returns an empty slice.

func (*Store) ListTrashedSessions

func (s *Store) ListTrashedSessions(
	_ context.Context,
) ([]db.Session, error)

ListTrashedSessions returns an empty slice.

func (*Store) PinMessage

func (s *Store) PinMessage(
	_ string, _ int64, _ *string,
) (int64, error)

PinMessage is not supported in read-only mode.

func (*Store) ReadOnly

func (s *Store) ReadOnly() bool

ReadOnly returns true; this is a read-only data source.

func (*Store) RenameSession

func (s *Store) RenameSession(
	_ string, _ *string,
) error

RenameSession is not supported in read-only mode.

func (*Store) ReplaceSessionMessages

func (s *Store) ReplaceSessionMessages(
	_ string, _ []db.Message,
) error

ReplaceSessionMessages is not supported in read-only mode.

func (*Store) RestoreSession

func (s *Store) RestoreSession(_ string) (int64, error)

RestoreSession is not supported in read-only mode.

func (*Store) Search

func (s *Store) Search(
	ctx context.Context, f db.SearchFilter,
) (db.SearchPage, error)

Search performs ILIKE-based full-text search across messages, grouped to one result per session via DISTINCT ON, UNION'd with a session name (display_name / first_message) branch.

func (*Store) SearchSession

func (s *Store) SearchSession(
	ctx context.Context, sessionID, query string,
) ([]int, error)

SearchSession performs ILIKE substring search within a single session's messages, returning matching ordinals.

func (*Store) SetCursorSecret

func (s *Store) SetCursorSecret(secret []byte)

SetCursorSecret sets the HMAC key used for cursor signing.

func (*Store) SoftDeleteSession

func (s *Store) SoftDeleteSession(_ string) error

SoftDeleteSession is not supported in read-only mode.

func (*Store) StarSession

func (s *Store) StarSession(_ string) (bool, error)

StarSession is not supported in read-only mode.

func (*Store) UnpinMessage

func (s *Store) UnpinMessage(_ string, _ int64) error

UnpinMessage is not supported in read-only mode.

func (*Store) UnstarSession

func (s *Store) UnstarSession(_ string) error

UnstarSession is not supported in read-only mode.

func (*Store) UpsertSession

func (s *Store) UpsertSession(_ db.Session) error

UpsertSession is not supported in read-only mode.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync manages push-only sync from local SQLite to a remote PostgreSQL database.

func New

func New(
	pgURL, schema string, local *db.DB,
	machine string, allowInsecure bool,
) (*Sync, error)

New creates a Sync instance and verifies the PG connection. The machine name must not be "local", which is reserved as the SQLite sentinel for sessions that originated on this machine. When allowInsecure is true, non-loopback connections without TLS produce a warning instead of failing.

func (*Sync) Close

func (s *Sync) Close() error

Close closes the PostgreSQL connection pool. Callers must ensure no Push operations are in-flight before calling Close; otherwise those operations will fail with connection errors.

func (*Sync) DB

func (s *Sync) DB() *sql.DB

DB returns the underlying PostgreSQL connection pool.

func (*Sync) EnsureSchema

func (s *Sync) EnsureSchema(ctx context.Context) error

EnsureSchema creates the schema and tables in PG if they don't already exist. It also marks the schema as initialized so subsequent Push calls skip redundant checks.

func (*Sync) Push

func (s *Sync) Push(
	ctx context.Context, full bool,
) (PushResult, error)

Push syncs local sessions and messages to PostgreSQL. Only sessions modified since the last push are processed. When full is true, the per-message content heuristic is bypassed and every candidate session's messages are re-pushed unconditionally.

Known limitation: sessions that are permanently deleted from SQLite (via prune) are not propagated as deletions to PG because the local rows no longer exist at push time. Sessions soft-deleted with deleted_at are synced correctly. Use a direct PG DELETE to remove permanently pruned sessions from PG if needed.

func (*Sync) Status

func (s *Sync) Status(
	ctx context.Context,
) (SyncStatus, error)

Status returns sync status information. Sync state reads (last_push_at) are non-fatal because these are informational watermarks stored in SQLite. PG query failures are fatal because they indicate a connectivity problem that the caller needs to know about.

type SyncStateStore

type SyncStateStore interface {
	GetSyncState(key string) (string, error)
	SetSyncState(key, value string) error
}

SyncStateStore is the interface needed for normalizing local sync timestamps stored in SQLite.

type SyncStatus

type SyncStatus struct {
	Machine    string `json:"machine"`
	LastPushAt string `json:"last_push_at"`
	PGSessions int    `json:"pg_sessions"`
	PGMessages int    `json:"pg_messages"`
}

SyncStatus holds summary information about the sync state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL