postgres

package
v0.7.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Overview

Package postgres — embedded.go

Thin wrapper around fergusstrange/embedded-postgres that spawns a real postgres process as a child of the duragraph engine. Used by the "embedded" binary mode (binary-modes.yml § embedded_components.postgres).

The wrapper exists for three reasons:

  1. Keep the library's fluent-builder API away from the cmd/serve.go wiring — serve.go consumes a flat EmbeddedConfig struct populated from internal/config, not a chain of method calls.
  2. Map the config package's plain string version ("15") to the library's typed embeddedpostgres.PostgresVersion. This isolates the heavy library dep from the config package.
  3. Give us a stable seam for future test substitution (interface extraction is deferred until we actually need it — Phase 4).

Lifecycle expectations (per spec):

  • Start blocks until the postgres process is accepting connections.
  • Stop sends SIGTERM equivalent and waits for graceful shutdown (the library handles the fsync/checkpoint/exit dance).
  • The data directory (cfg.DataDir) is preserved across Start/Stop pairs. Runtime path and binary cache use the library defaults (~/.embedded-postgres-go/) so first start fetches the binary; subsequent starts reuse the cache.

Package postgres — migrator.go

Migrator is the runtime component that owns DB-level provisioning (CREATE DATABASE / DROP DATABASE) and schema rollout (golang-migrate).

It supports three execution modes that map onto the v1.0-platform plan:

  • Single-DB / drop-in mode (default for existing deployments): `MigrateMainDB(ctx, dbName)` runs the tenant migrations against the engine's primary DB (env DB_NAME, e.g. `appdb`). This preserves the pre-multi-tenant flow where one DB holds everything.

  • Platform DB (`duragraph_platform`): holds users, tenants, audit log. Created by `Bootstrap` if absent; schema applied via `MigratePlatform`. Singleton, shared across all tenants.

  • Per-tenant DB (`tenant_<32hex>`): one DB per approved tenant. Created via `ProvisionTenant` (CREATE DATABASE + tenant migrations up). Removed via `DropTenant`.

Idempotency:

  • `golang-migrate` maintains a version table per DB; re-running past the current version is a no-op (returns `migrate.ErrNoChange`, which the helpers below swallow).
  • `CREATE DATABASE` is wrapped in a `pg_database` existence check (Postgres has no `CREATE DATABASE IF NOT EXISTS`).
  • `DROP DATABASE` uses `IF EXISTS`.

Embedding:

  • SQL migrations live next to this file (`migrations/tenant/`, `migrations/platform/`) because `go:embed` only reads files inside or below the package directory. The spec convention placed them at `deploy/sql/{tenant,platform}/`; a follow-up update PR to `Duragraph/duragraph-spec` will revise the spec to match.
  • The `migrations/platform/` directory is intentionally near-empty today (just a README); platform schema lands in `feat/platform-db-init`. The empty path is detected up front (no `<version>_*.sql` files in the embed.FS) and short-circuited to a no-op — see hasMigrations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Close

func Close(pool *pgxpool.Pool)

Close closes the connection pool

func ClosePools

func ClosePools(pools *Pools)

ClosePools closes both write and read pools.

func NewPool

func NewPool(ctx context.Context, config Config) (*pgxpool.Pool, error)

NewPool creates a new PostgreSQL connection pool

Types

type AssistantRepository

type AssistantRepository struct {
	// contains filtered or unexported fields
}

AssistantRepository implements the workflow.AssistantRepository interface

func NewAssistantRepository

func NewAssistantRepository(pool *pgxpool.Pool, eventStore *EventStore) *AssistantRepository

NewAssistantRepository creates a new assistant repository

func NewAssistantRepositoryWithPools

func NewAssistantRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *AssistantRepository

NewAssistantRepositoryWithPools creates an assistant repository with separate read/write pools

func (*AssistantRepository) Count

Count returns the number of assistants matching the given filters

func (*AssistantRepository) Delete

func (r *AssistantRepository) Delete(ctx context.Context, id string) error

Delete removes an assistant

func (*AssistantRepository) FindByID

FindByID retrieves an assistant by ID

func (*AssistantRepository) FindVersions

func (r *AssistantRepository) FindVersions(ctx context.Context, assistantID string, limit int) ([]workflow.AssistantVersionInfo, error)

FindVersions retrieves version history for an assistant

func (*AssistantRepository) List

func (r *AssistantRepository) List(ctx context.Context, limit, offset int) ([]*workflow.Assistant, error)

List retrieves assistants with pagination

func (*AssistantRepository) Save

func (r *AssistantRepository) Save(ctx context.Context, assistant *workflow.Assistant) error

Save persists an assistant aggregate and its events

func (*AssistantRepository) SaveVersion

SaveVersion saves a new version of an assistant

func (*AssistantRepository) Search

Search retrieves assistants matching the given filters

func (*AssistantRepository) SetLatestVersion

func (r *AssistantRepository) SetLatestVersion(ctx context.Context, assistantID string, version int) error

SetLatestVersion updates the assistant to point to a specific version

func (*AssistantRepository) Update

func (r *AssistantRepository) Update(ctx context.Context, assistant *workflow.Assistant) error

Update updates an existing assistant

type CheckpointRepository

type CheckpointRepository struct {
	// contains filtered or unexported fields
}

CheckpointRepository implements checkpoint.Repository using PostgreSQL

func NewCheckpointRepository

func NewCheckpointRepository(pool *pgxpool.Pool) *CheckpointRepository

NewCheckpointRepository creates a new checkpoint repository

func NewCheckpointRepositoryWithPools

func NewCheckpointRepositoryWithPools(writePool, readPool *pgxpool.Pool) *CheckpointRepository

NewCheckpointRepositoryWithPools creates a checkpoint repository with separate read/write pools

func (*CheckpointRepository) Delete

func (r *CheckpointRepository) Delete(ctx context.Context, id string) error

Delete removes a checkpoint

func (*CheckpointRepository) FindByCheckpointID

func (r *CheckpointRepository) FindByCheckpointID(ctx context.Context, threadID, checkpointNS, checkpointID string) (*checkpoint.Checkpoint, error)

FindByCheckpointID retrieves a checkpoint by thread_id, checkpoint_ns, and checkpoint_id

func (*CheckpointRepository) FindByID

FindByID retrieves a checkpoint by ID

func (*CheckpointRepository) FindHistory

func (r *CheckpointRepository) FindHistory(ctx context.Context, threadID string, checkpointNS string, limit int, before string) ([]*checkpoint.Checkpoint, error)

FindHistory retrieves checkpoint history for a thread

func (*CheckpointRepository) FindLatest

func (r *CheckpointRepository) FindLatest(ctx context.Context, threadID string, checkpointNS string) (*checkpoint.Checkpoint, error)

FindLatest retrieves the latest checkpoint for a thread

func (*CheckpointRepository) FindWritesByCheckpoint

func (r *CheckpointRepository) FindWritesByCheckpoint(ctx context.Context, threadID, checkpointNS, checkpointID string) ([]*checkpoint.CheckpointWrite, error)

FindWritesByCheckpoint retrieves all writes for a checkpoint

func (*CheckpointRepository) Save

Save persists a checkpoint

func (*CheckpointRepository) SaveWrite

SaveWrite persists a checkpoint write

type Config

type Config struct {
	Host     string
	Port     int
	User     string
	Password string
	Database string
	SSLMode  string
}

Config holds database configuration

type CronJob

type CronJob struct {
	CronID         string
	AssistantID    string
	ThreadID       *string
	Schedule       string
	Timezone       string
	Payload        map[string]interface{}
	Metadata       map[string]interface{}
	Enabled        bool
	OnRunCompleted string
	EndTime        *time.Time
	NextRunDate    *time.Time
	UserID         *string
	CreatedAt      time.Time
	UpdatedAt      time.Time
}

CronJob represents a scheduled cron job.

type CronRepository

type CronRepository struct {
	// contains filtered or unexported fields
}

CronRepository provides CRUD operations for cron jobs.

func NewCronRepository

func NewCronRepository(pool *pgxpool.Pool) *CronRepository

func NewCronRepositoryWithPools

func NewCronRepositoryWithPools(writePool, readPool *pgxpool.Pool) *CronRepository

func (*CronRepository) Count

func (r *CronRepository) Count(ctx context.Context, assistantID, threadID *string) (int, error)

Count returns the number of crons matching optional filters.

func (*CronRepository) Create

func (r *CronRepository) Create(ctx context.Context, c *CronJob) (string, error)

Create inserts a new cron job and returns its ID.

func (*CronRepository) Delete

func (r *CronRepository) Delete(ctx context.Context, cronID string) error

Delete removes a cron job by ID.

func (*CronRepository) GetByID

func (r *CronRepository) GetByID(ctx context.Context, cronID string) (*CronJob, error)

GetByID retrieves a cron job by ID.

func (*CronRepository) GetDueJobs

func (r *CronRepository) GetDueJobs(ctx context.Context, now time.Time) ([]CronJob, error)

GetDueJobs returns enabled cron jobs whose next_run_date has passed.

func (*CronRepository) Search

func (r *CronRepository) Search(ctx context.Context, assistantID, threadID *string, enabled *bool, limit, offset int, sortBy, sortOrder string) ([]CronJob, error)

Search finds cron jobs matching optional filters with pagination.

func (*CronRepository) Update

func (r *CronRepository) Update(ctx context.Context, cronID string, updates map[string]interface{}) (*CronJob, error)

Update patches a cron job. Only non-nil fields are updated.

func (*CronRepository) UpdateNextRun

func (r *CronRepository) UpdateNextRun(ctx context.Context, cronID string, nextRun time.Time) error

UpdateNextRun sets the next_run_date for a cron job.

type EmbeddedConfig added in v0.7.0

type EmbeddedConfig struct {
	Port     uint32
	DataDir  string // persistent data path, NOT the runtime path
	Username string
	Password string
	Database string
	Version  string // postgres major version string, e.g. "15"
	Logger   io.Writer

	// StartTimeout caps how long Start() waits for the postgres process
	// to become healthy. Zero means "use library default" (15s, which is
	// enough for cached binaries but tight for a first-run download on
	// slow links). Operators bump via DB_EMBEDDED_START_TIMEOUT
	// (Go duration syntax, e.g. "90s") — wired in internal/config.Load
	// and plumbed through serve.go.
	StartTimeout time.Duration
}

EmbeddedConfig is the flat config struct populated by serve.go from internal/config.DatabaseConfig. Fields mirror the library's relevant builder methods 1:1.

type EmbeddedPostgres added in v0.7.0

type EmbeddedPostgres struct {
	// contains filtered or unexported fields
}

EmbeddedPostgres wraps the library's *EmbeddedPostgres so we can give the surrounding code a context-friendly Start/Stop signature without fighting the library's blocking calls.

func NewEmbedded added in v0.7.0

func NewEmbedded(cfg EmbeddedConfig) (*EmbeddedPostgres, error)

NewEmbedded constructs an EmbeddedPostgres ready to Start. We create cfg.DataDir up front (with restrictive 0o700 perms — postgres initdb rejects looser permissions on a pre-existing data dir) so the downstream library does not have to recurse parent paths itself.

Library quirk: on first run the library calls os.RemoveAll(dataPath) then runs initdb, which creates the dir fresh. On subsequent runs (reused dir) the library does NOT touch perms, so an operator who upgraded from a pre-fix build with a 0o755 data dir would still fail initdb's permission check — the chmod step below is the only thing that recovers them.

func (*EmbeddedPostgres) Config added in v0.7.0

func (e *EmbeddedPostgres) Config() EmbeddedConfig

Config returns the resolved EmbeddedConfig. Mostly useful for tests and for the loud-startup logging in serve.go.

func (*EmbeddedPostgres) Start added in v0.7.0

func (e *EmbeddedPostgres) Start(ctx context.Context) error

Start launches the embedded postgres process and blocks until it is accepting connections. The ctx parameter is currently honoured only for symmetry with the rest of the codebase — the library's own Start() does not take a context. If the caller's ctx is already done when Start is invoked we short-circuit with ctx.Err().

func (*EmbeddedPostgres) Stop added in v0.7.0

func (e *EmbeddedPostgres) Stop(ctx context.Context) error

Stop gracefully terminates the embedded postgres process. The library's Stop() blocks until pg_ctl reports a clean shutdown (fsync + checkpoint + exit). The ctx parameter is honoured the same way as in Start — short-circuit if already cancelled.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore implements event sourcing storage

func NewEventStore

func NewEventStore(pool *pgxpool.Pool) *EventStore

NewEventStore creates a new event store

func (*EventStore) CreateSnapshot

func (s *EventStore) CreateSnapshot(ctx context.Context, streamID, aggregateType, aggregateID string, version int, state map[string]interface{}) error

CreateSnapshot creates a snapshot of aggregate state

func (*EventStore) LoadEvents

func (s *EventStore) LoadEvents(ctx context.Context, aggregateType, aggregateID string) ([]map[string]interface{}, error)

LoadEvents loads all events for an aggregate

func (*EventStore) LoadSnapshot

func (s *EventStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (map[string]interface{}, int, error)

LoadSnapshot loads the latest snapshot for an aggregate

func (*EventStore) SaveEvents

func (s *EventStore) SaveEvents(ctx context.Context, streamID, aggregateType, aggregateID string, events []eventbus.Event) error

SaveEvents saves events to the event store and outbox in its own transaction.

func (*EventStore) SaveEventsInTx

func (s *EventStore) SaveEventsInTx(ctx context.Context, tx pgx.Tx, streamID, aggregateType, aggregateID string, events []eventbus.Event) error

SaveEventsInTx saves events within an existing transaction for atomic writes.

type GraphRepository

type GraphRepository struct {
	// contains filtered or unexported fields
}

GraphRepository implements the workflow.GraphRepository interface

func NewGraphRepository

func NewGraphRepository(pool *pgxpool.Pool, eventStore *EventStore) *GraphRepository

NewGraphRepository creates a new graph repository

func NewGraphRepositoryWithPools

func NewGraphRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *GraphRepository

NewGraphRepositoryWithPools creates a graph repository with separate read/write pools

func (*GraphRepository) Delete

func (r *GraphRepository) Delete(ctx context.Context, id string) error

Delete removes a graph

func (*GraphRepository) FindByAssistantID

func (r *GraphRepository) FindByAssistantID(ctx context.Context, assistantID string) ([]*workflow.Graph, error)

FindByAssistantID retrieves graphs for a specific assistant

func (*GraphRepository) FindByAssistantIDAndVersion

func (r *GraphRepository) FindByAssistantIDAndVersion(ctx context.Context, assistantID, version string) (*workflow.Graph, error)

FindByAssistantIDAndVersion retrieves a specific graph version

func (*GraphRepository) FindByID

func (r *GraphRepository) FindByID(ctx context.Context, id string) (*workflow.Graph, error)

FindByID retrieves a graph by ID

func (*GraphRepository) Save

func (r *GraphRepository) Save(ctx context.Context, graph *workflow.Graph) error

Save persists a graph aggregate and its events

func (*GraphRepository) Update

func (r *GraphRepository) Update(ctx context.Context, graph *workflow.Graph) error

Update updates an existing graph

type InterruptRepository

type InterruptRepository struct {
	// contains filtered or unexported fields
}

InterruptRepository implements the humanloop.Repository interface

func NewInterruptRepository

func NewInterruptRepository(pool *pgxpool.Pool, eventStore *EventStore) *InterruptRepository

NewInterruptRepository creates a new interrupt repository

func NewInterruptRepositoryWithPools

func NewInterruptRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *InterruptRepository

NewInterruptRepositoryWithPools creates an interrupt repository with separate read/write pools

func (*InterruptRepository) Delete

func (r *InterruptRepository) Delete(ctx context.Context, id string) error

Delete removes an interrupt

func (*InterruptRepository) FindByID

FindByID retrieves an interrupt by ID

func (*InterruptRepository) FindByRunID

func (r *InterruptRepository) FindByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)

FindByRunID retrieves interrupts for a specific run

func (*InterruptRepository) FindUnresolvedByRunID

func (r *InterruptRepository) FindUnresolvedByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)

FindUnresolvedByRunID retrieves unresolved interrupts for a run

func (*InterruptRepository) Save

func (r *InterruptRepository) Save(ctx context.Context, interrupt *humanloop.Interrupt) error

Save persists an interrupt aggregate and its events

func (*InterruptRepository) Update

func (r *InterruptRepository) Update(ctx context.Context, interrupt *humanloop.Interrupt) error

Update updates an existing interrupt

type Logger added in v0.7.0

type Logger interface {
	Printf(format string, args ...any)
}

Logger is the minimal logging surface the migrator needs. Both `*log.Logger` and `log.Default()` satisfy it, so the engine's stdlib-`log` style integrates without extra glue.

type Migrator added in v0.7.0

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator owns DB provisioning + schema rollout.

A single Migrator is safe for concurrent use across goroutines; the helpers each open short-lived admin connections and close them before returning. There is no long-lived shared pool inside the migrator, because migrations run only at startup + on tenant approval — both rare events.

func NewMigrator added in v0.7.0

func NewMigrator(pgAdminDSN string, opts ...MigratorOption) (*Migrator, error)

NewMigrator constructs a Migrator. pgAdminDSN must be a postgres URL of the form `postgres://user:pass@host:port/<anydb>?sslmode=...`. The DB component is overwritten per call; "postgres" or "appdb" both work as the placeholder.

Returns an error only if pgAdminDSN cannot be parsed as a URL.

func (*Migrator) Bootstrap added in v0.7.0

func (m *Migrator) Bootstrap(ctx context.Context) error

Bootstrap ensures the platform DB exists and runs platform migrations. Safe to call repeatedly. Returns nil on success even when the platform migrations directory is empty (handled inside MigratePlatform).

func (*Migrator) DropTenant added in v0.7.0

func (m *Migrator) DropTenant(ctx context.Context, tenantID string) error

DropTenant drops the tenant DB. Safe to call when the DB doesn't exist (uses DROP DATABASE IF EXISTS). Active connections to the DB will block the drop in Postgres ≥13; callers should ensure the pgxpool for this tenant has been closed before invoking.

func (*Migrator) MigrateAllTenants added in v0.7.0

func (m *Migrator) MigrateAllTenants(ctx context.Context) []TenantMigrationResult

MigrateAllTenants iterates approved tenants from `platform.tenants` and runs MigrateTenant against each. Bounded concurrency (5 in flight).

Graceful empty paths:

  • If the platform DB doesn't exist (SQLSTATE 3D000) → empty result, warning logged. This fires when MIGRATOR_PLATFORM_ENABLED=false and Bootstrap has never been called.
  • If `platform.tenants` doesn't exist (SQLSTATE 42P01) → empty result, warning logged. This fires before `feat/platform-db-init` adds the platform schema.

Per-tenant migration failures are surfaced as Err on individual results; this method itself returns no error.

func (*Migrator) MigrateMainDB added in v0.7.0

func (m *Migrator) MigrateMainDB(ctx context.Context, dbName string) error

MigrateMainDB runs the tenant migrations against the engine's primary DB. Used during the transition before multi-tenant routing lands; this preserves the existing single-DB dev/test/prod flow that previously relied on docker-entrypoint-initdb.d.

func (*Migrator) MigratePlatform added in v0.7.0

func (m *Migrator) MigratePlatform(ctx context.Context) error

MigratePlatform runs platform migrations against the platform DB. Assumes the DB exists. Empty migration set short-circuits to a no-op; ErrNoChange (already at head) is treated as success.

func (*Migrator) MigrateTenant added in v0.7.0

func (m *Migrator) MigrateTenant(ctx context.Context, tenantID string) (uint, error)

MigrateTenant runs tenant migrations against an existing tenant DB and returns the resulting version. ErrNoChange (already at head) is treated as success; the version reflects the post-migration state.

func (*Migrator) ProvisionTenant added in v0.7.0

func (m *Migrator) ProvisionTenant(ctx context.Context, tenantID string) error

ProvisionTenant creates the tenant DB if absent and applies all tenant migrations. Safe to call repeatedly: existing DB → skip create; existing schema → ErrNoChange swallowed.

tenantID must be a UUID string; the DB name is derived via tenant.DBName (which validates the UUID).

Discards the resulting schema version; callers that need it should use ProvisionTenantWithVersion instead (avoids a second migrate.Up() round-trip just to read the version table).

func (*Migrator) ProvisionTenantWithVersion added in v0.7.0

func (m *Migrator) ProvisionTenantWithVersion(ctx context.Context, tenantID string) (uint, error)

ProvisionTenantWithVersion is ProvisionTenant that also returns the resulting schema version, avoiding a second MigrateTenant call just to read the version. Used by the tenant_provisioner subscriber where the version is recorded on tenant.Approve.

type MigratorOption added in v0.7.0

type MigratorOption func(*Migrator)

MigratorOption configures a Migrator.

func WithLogger added in v0.7.0

func WithLogger(l Logger) MigratorOption

WithLogger overrides the default logger (log.Default()).

func WithPlatformDBName added in v0.7.0

func WithPlatformDBName(name string) MigratorOption

WithPlatformDBName overrides the default platform DB name. Default is "duragraph_platform".

type Option added in v0.7.0

type Option func(*PoolManager)

Option configures a PoolManager.

func WithEvictInterval added in v0.7.0

func WithEvictInterval(d time.Duration) Option

WithEvictInterval sets how often the eviction goroutine scans for idle pools. Default is 1 minute.

func WithIdleTimeout added in v0.7.0

func WithIdleTimeout(d time.Duration) Option

WithIdleTimeout sets how long a tenant pool may sit idle before being evicted. Default is 10 minutes.

func WithMaxConns added in v0.7.0

func WithMaxConns(n int32) Option

WithMaxConns sets the per-tenant pgxpool MaxConns. Default is 2.

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox implements the outbox pattern for reliable event publishing

func NewOutbox

func NewOutbox(pool *pgxpool.Pool) *Outbox

NewOutbox creates a new outbox

func (*Outbox) Cleanup

func (o *Outbox) Cleanup(ctx context.Context, retentionDays int) (int64, error)

Cleanup removes old published messages

func (*Outbox) GetUnpublished

func (o *Outbox) GetUnpublished(ctx context.Context, limit int) ([]*OutboxMessage, error)

GetUnpublished retrieves unpublished messages from the outbox. Uses FOR UPDATE SKIP LOCKED so multiple relay instances don't process the same messages.

func (*Outbox) MarkAsFailed

func (o *Outbox) MarkAsFailed(ctx context.Context, id int64, errorMsg string) error

MarkAsFailed marks a message as failed and schedules retry

func (*Outbox) MarkAsPublished

func (o *Outbox) MarkAsPublished(ctx context.Context, id int64) error

MarkAsPublished marks a message as published

type OutboxMessage

type OutboxMessage struct {
	ID            int64
	EventID       string
	AggregateType string
	AggregateID   string
	EventType     string
	Payload       map[string]interface{}
	Metadata      map[string]interface{}
	CreatedAt     time.Time
	Published     bool
	PublishedAt   *time.Time
	Attempts      int
	LastError     *string
	NextRetryAt   *time.Time
}

OutboxMessage represents a message in the outbox

type PoolManager added in v0.7.0

type PoolManager struct {
	// contains filtered or unexported fields
}

PoolManager owns one *pgxpool.Pool per tenant id, lazily created on first ForTenant call and reaped after idle. All tenants share a baseConfig (host/port/credentials/SSL); only `Database` and `MaxConns` are mutated per tenant onto a copy.

The manager is safe for concurrent use. Eviction runs in a single background goroutine, started by Start and stopped by Close (or parent ctx cancellation).

func NewPoolManager added in v0.7.0

func NewPoolManager(baseConfig *pgxpool.Config, opts ...Option) *PoolManager

NewPoolManager constructs a PoolManager. The caller is responsible for parsing the base DSN via `pgxpool.ParseConfig`; this matches the pattern in db.go where the application pre-parses config and passes a ready-to-use *pgxpool.Config. The Database field on baseConfig is ignored (overwritten per tenant).

func (*PoolManager) Close added in v0.7.0

func (m *PoolManager) Close() error

Close stops eviction, closes every cached pool, and clears the map. Safe to call from any goroutine; subsequent ForTenant calls return an error. Always returns nil today; the signature returns error so future implementations can surface partial-close failures without a breaking change.

func (*PoolManager) ForTenant added in v0.7.0

func (m *PoolManager) ForTenant(ctx context.Context, tenantID string) (*pgxpool.Pool, error)

ForTenant returns the pgxpool for the given tenant, creating it on first request. tenantID is a UUID string ("xxxxxxxx-xxxx-..."); it is parsed and re-rendered to the canonical lowercase form before derivation.

func (*PoolManager) Start added in v0.7.0

func (m *PoolManager) Start(ctx context.Context)

Start launches the idle-eviction goroutine. Calling Start more than once is a no-op. Eviction stops when ctx is cancelled or Close is called.

type Pools

type Pools struct {
	Write *pgxpool.Pool
	Read  *pgxpool.Pool
}

Pools holds write and read connection pools for CQRS split.

func NewPools

func NewPools(ctx context.Context, writeConfig Config, readConfig *Config) (*Pools, error)

NewPools creates write and read connection pools. If readConfig is nil, both pools point to the same (write) database.

type RunRepository

type RunRepository struct {
	// contains filtered or unexported fields
}

RunRepository implements the run.Repository interface

func NewRunRepository

func NewRunRepository(pool *pgxpool.Pool, eventStore *EventStore) *RunRepository

NewRunRepository creates a new run repository

func NewRunRepositoryWithPools

func NewRunRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *RunRepository

NewRunRepositoryWithPools creates a run repository with separate read/write pools

func (*RunRepository) Delete

func (r *RunRepository) Delete(ctx context.Context, id string) error

Delete removes a run

func (*RunRepository) FindActiveByThreadID

func (r *RunRepository) FindActiveByThreadID(ctx context.Context, threadID string) ([]*run.Run, error)

FindActiveByThreadID retrieves active (non-terminal) runs for a thread

func (*RunRepository) FindAll

func (r *RunRepository) FindAll(ctx context.Context, limit, offset int) ([]*run.Run, error)

FindAll retrieves all runs with pagination

func (*RunRepository) FindByAssistantID

func (r *RunRepository) FindByAssistantID(ctx context.Context, assistantID string, limit, offset int) ([]*run.Run, error)

FindByAssistantID retrieves runs for a specific assistant

func (*RunRepository) FindByID

func (r *RunRepository) FindByID(ctx context.Context, id string) (*run.Run, error)

FindByID retrieves a run by ID (reads from read pool)

func (*RunRepository) FindByIDConsistent

func (r *RunRepository) FindByIDConsistent(ctx context.Context, id string) (*run.Run, error)

FindByIDConsistent reads from write pool for strong consistency after writes

func (*RunRepository) FindByStatus

func (r *RunRepository) FindByStatus(ctx context.Context, status run.Status, limit, offset int) ([]*run.Run, error)

FindByStatus retrieves runs by status

func (*RunRepository) FindByThreadID

func (r *RunRepository) FindByThreadID(ctx context.Context, threadID string, limit, offset int) ([]*run.Run, error)

FindByThreadID retrieves runs for a specific thread

func (*RunRepository) FindExpiredLeases

func (r *RunRepository) FindExpiredLeases(ctx context.Context) ([]*run.Run, error)

FindExpiredLeases finds runs with expired worker leases. Uses FOR UPDATE SKIP LOCKED to prevent multiple instances from processing the same expired lease.

func (*RunRepository) LoadFromEvents

func (r *RunRepository) LoadFromEvents(ctx context.Context, id string) (*run.Run, error)

LoadFromEvents rebuilds a run from event store

func (*RunRepository) Save

func (r *RunRepository) Save(ctx context.Context, runAgg *run.Run) error

Save persists a run aggregate and its events atomically in a single transaction.

func (*RunRepository) Update

func (r *RunRepository) Update(ctx context.Context, runAgg *run.Run) error

Update updates an existing run atomically (projection + events in single transaction). Uses optimistic concurrency control: the update only succeeds if the version matches.

type StoreItem

type StoreItem struct {
	Namespace []string
	Key       string
	Value     map[string]interface{}
	CreatedAt time.Time
	UpdatedAt time.Time
}

StoreItem represents a namespaced key-value item.

type StoreRepository

type StoreRepository struct {
	// contains filtered or unexported fields
}

StoreRepository provides CRUD operations for the namespaced key-value store.

func NewStoreRepository

func NewStoreRepository(pool *pgxpool.Pool) *StoreRepository

func NewStoreRepositoryWithPools

func NewStoreRepositoryWithPools(writePool, readPool *pgxpool.Pool) *StoreRepository

func (*StoreRepository) Delete

func (r *StoreRepository) Delete(ctx context.Context, namespace []string, key string) error

Delete removes an item by namespace and key.

func (*StoreRepository) Get

func (r *StoreRepository) Get(ctx context.Context, namespace []string, key string, refreshTTL bool) (*StoreItem, error)

Get retrieves a single item by namespace and key. Returns nil if not found or expired.

func (*StoreRepository) ListNamespaces

func (r *StoreRepository) ListNamespaces(ctx context.Context, prefix, suffix []string, maxDepth, limit, offset int) ([][]string, error)

ListNamespaces returns distinct namespaces matching optional prefix/suffix and depth constraints.

func (*StoreRepository) Put

func (r *StoreRepository) Put(ctx context.Context, namespace []string, key string, value map[string]interface{}, ttlMinutes int) error

Put inserts or updates an item. ttlMinutes <= 0 means no expiration.

func (*StoreRepository) Search

func (r *StoreRepository) Search(ctx context.Context, namespacePrefix []string, filter map[string]interface{}, limit, offset int) ([]StoreItem, error)

Search finds items within a namespace prefix, with optional value filter, limit, and offset.

type TaskAssignmentRepository

type TaskAssignmentRepository struct {
	// contains filtered or unexported fields
}

TaskAssignmentRepository implements worker.TaskRepository using PostgreSQL.

func NewTaskAssignmentRepository

func NewTaskAssignmentRepository(pool *pgxpool.Pool) *TaskAssignmentRepository

NewTaskAssignmentRepository creates a new task assignment repository.

func (*TaskAssignmentRepository) Claim

func (r *TaskAssignmentRepository) Claim(ctx context.Context, workerID string, graphIDs []string, leaseDuration time.Duration, maxTasks int) ([]*worker.TaskAssignment, error)

Claim atomically claims pending tasks for a worker using FOR UPDATE SKIP LOCKED.

func (*TaskAssignmentRepository) Complete

func (r *TaskAssignmentRepository) Complete(ctx context.Context, id int64) error

func (*TaskAssignmentRepository) Create

func (*TaskAssignmentRepository) Fail

func (r *TaskAssignmentRepository) Fail(ctx context.Context, id int64, errMsg string) error

func (*TaskAssignmentRepository) FindByRunID

func (r *TaskAssignmentRepository) FindByRunID(ctx context.Context, runID string) (*worker.TaskAssignment, error)

func (*TaskAssignmentRepository) FindExpiredLeases

func (r *TaskAssignmentRepository) FindExpiredLeases(ctx context.Context) ([]*worker.TaskAssignment, error)

func (*TaskAssignmentRepository) RetryOrFail

func (r *TaskAssignmentRepository) RetryOrFail(ctx context.Context, id int64) error

RetryOrFail requeues a task if retries remain, otherwise marks it failed.

type TenantMigrationResult added in v0.7.0

type TenantMigrationResult struct {
	TenantID string
	Version  uint
	Err      error
}

TenantMigrationResult captures the outcome of MigrateAllTenants for a single tenant. Err is nil on success.

type TenantRepository added in v0.7.0

type TenantRepository struct {
	// contains filtered or unexported fields
}

TenantRepository persists the Tenant aggregate against the platform DB (`platform.tenants`). All queries are schema-qualified per the platform schema convention (PR #151).

Pure projection writer — no event store / outbox interaction. The audit-log mirror lives in a later PR.

Optimistic concurrency token: updated_at (no version column on the table). Same approach as UserRepository — see that file for the full rationale.

func NewTenantRepository added in v0.7.0

func NewTenantRepository(pool *pgxpool.Pool) *TenantRepository

NewTenantRepository constructs a TenantRepository against the given platform DB pool. The pool must be connected to the platform DB (schema-qualified queries against `platform.tenants` fail with `42P01 undefined_table` otherwise).

func (*TenantRepository) GetByID added in v0.7.0

func (r *TenantRepository) GetByID(ctx context.Context, id string) (*tenant.Tenant, error)

GetByID retrieves a tenant by ID. Returns errors.NotFound when no row matches.

func (*TenantRepository) GetByUserID added in v0.7.0

func (r *TenantRepository) GetByUserID(ctx context.Context, userID string) (*tenant.Tenant, error)

GetByUserID retrieves the tenant owned by the given user. The 1:1 relationship is enforced by tenants_user_id_unique at the schema level, so at most one row matches. Returns errors.NotFound when the user has no tenant yet.

func (*TenantRepository) ListByStatus added in v0.7.0

func (r *TenantRepository) ListByStatus(ctx context.Context, status tenant.Status, limit, offset int) ([]*tenant.Tenant, error)

ListByStatus retrieves tenants in a particular status with pagination. Ordered by created_at ascending — same fairness convention as UserRepository.ListByStatus. `id` is appended as a deterministic tiebreaker so LIMIT/OFFSET pagination is stable when two tenants share a created_at timestamp (microsecond collisions on batch inserts would otherwise let pages drop or duplicate rows).

func (*TenantRepository) Save added in v0.7.0

Save persists a Tenant aggregate's projection state. New aggregates (LoadedUpdatedAt zero) are inserted; loaded aggregates are updated with optimistic-concurrency check on updated_at.

The table-level CHECK constraints (tenants_db_name_derived_from_id, tenants_approved_requires_schema_version, tenants_approved_requires_provisioned_at, tenants_failure_reason_only_when_failed) propagate through unchanged — Save does not attempt to suppress them; they are the schema layer's last line of defense and a validation bug surface.

Successful Save calls SetPersistedState (refreshes loadedUpdatedAt from the RETURNING row) and ClearEvents.

type ThreadRepository

type ThreadRepository struct {
	// contains filtered or unexported fields
}

ThreadRepository implements the workflow.ThreadRepository interface

func NewThreadRepository

func NewThreadRepository(pool *pgxpool.Pool, eventStore *EventStore) *ThreadRepository

NewThreadRepository creates a new thread repository

func NewThreadRepositoryWithPools

func NewThreadRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *ThreadRepository

NewThreadRepositoryWithPools creates a thread repository with separate read/write pools

func (*ThreadRepository) Count

Count returns the number of threads matching the given filters

func (*ThreadRepository) Delete

func (r *ThreadRepository) Delete(ctx context.Context, id string) error

Delete removes a thread

func (*ThreadRepository) FindByID

func (r *ThreadRepository) FindByID(ctx context.Context, id string) (*workflow.Thread, error)

FindByID retrieves a thread by ID

func (*ThreadRepository) List

func (r *ThreadRepository) List(ctx context.Context, limit, offset int) ([]*workflow.Thread, error)

List retrieves threads with pagination

func (*ThreadRepository) Save

func (r *ThreadRepository) Save(ctx context.Context, thread *workflow.Thread) error

Save persists a thread aggregate and its events

func (*ThreadRepository) Search

Search retrieves threads matching the given filters

func (*ThreadRepository) Update

func (r *ThreadRepository) Update(ctx context.Context, thread *workflow.Thread) error

Update updates an existing thread

type UserRepository added in v0.7.0

type UserRepository struct {
	// contains filtered or unexported fields
}

UserRepository persists the User aggregate against the platform DB (`platform.users`). All queries are schema-qualified per the platform schema convention introduced in PR #151 — the migrator queries `platform.users` schema-qualified, so this repository must do the same.

This is a pure projection writer: the User aggregate's uncommitted events (Events() / ClearEvents()) are NOT persisted by this repository. The audit-log projection that mirrors those events into `platform.audit_log` is delivered via a NATS subscriber in a later PR (Wave 2). Save() therefore never writes to the event store or the outbox.

Optimistic concurrency: the platform.users table has no `version` column, so `updated_at` is the OCC token. ReconstructFromData captures the column value as the aggregate's LoadedUpdatedAt(); Save() compares against it on UPDATE and returns errors.ErrConcurrency on a stale token. The fresh-vs-loaded discrimination is `LoadedUpdatedAt().IsZero()`.

func NewUserRepository added in v0.7.0

func NewUserRepository(pool *pgxpool.Pool) *UserRepository

NewUserRepository constructs a UserRepository against the given platform DB pool. The pool must be connected to the platform DB (the singleton `duragraph_platform` in production, or a per-test platform DB in integration tests) — the schema-qualified queries fail with `42P01 undefined_table` if pointed at any other DB.

func (*UserRepository) CountAll added in v0.7.0

func (r *UserRepository) CountAll(ctx context.Context) (int, error)

CountAll returns the total number of users in the platform DB. Used by the OAuth callback to detect the bootstrap-first-user branch (count==0 ⇒ auto-elevate to admin per auth/oauth.yml).

func (*UserRepository) CountByStatus added in v0.7.0

func (r *UserRepository) CountByStatus(ctx context.Context, status *user.Status) (int, error)

CountByStatus returns the number of users matching the given status, or all users when status is nil. Used by the admin handler to populate AdminUserListResponse.total independently of pagination.

func (*UserRepository) GetByEmail added in v0.7.3

func (r *UserRepository) GetByEmail(ctx context.Context, email string) (*user.User, error)

GetByEmail retrieves a user by email, case-insensitive. Backed by the idx_users_lower_email functional index added in migration 005, so the lookup is O(log n) on a busy platform.users.

Used by the password-login flow (auth/password.yml § endpoints.login): the client submits {email, password}, we lookup the user by email, then VerifyPassword against the stored bcrypt hash. The case-insensitive match is intentional — addresses are case-insensitive per RFC 5321 and users routinely re-type their address with different casing across signup vs. login. Pre-005 schema relied on byte-for-byte UNIQUE(email) for OAuth callbacks (where the provider always returns the same canonical address) so case-insensitive lookup is a password-flow addition, not a behavior change for OAuth.

Returns errors.NotFound when no row matches.

func (*UserRepository) GetByID added in v0.7.0

func (r *UserRepository) GetByID(ctx context.Context, id string) (*user.User, error)

GetByID retrieves a user by aggregate ID. Returns errors.NotFound when no row matches.

func (*UserRepository) GetByOAuth added in v0.7.0

func (r *UserRepository) GetByOAuth(ctx context.Context, provider, oauthID string) (*user.User, error)

GetByOAuth retrieves a user by the immutable (oauth_provider, oauth_id) pair. Returns errors.NotFound when no row matches.

func (*UserRepository) List added in v0.7.0

func (r *UserRepository) List(ctx context.Context, status *user.Status, limit, offset int) ([]*user.User, error)

List retrieves users with optional status filter and pagination. Mirrors ListByStatus's ORDER BY (created_at ASC, id ASC) so the admin UI sees the same fairness ordering — and the same stable pagination — whether it scopes by status or not. The `id` tiebreaker is required: without it, two rows that share a created_at timestamp can be reordered across LIMIT/OFFSET pages, leaving rows duplicated or skipped.

A nil status applies no filter. Branching at the SQL layer rather than building dynamic WHERE clauses keeps query plans cacheable on the PG side.

func (*UserRepository) ListByStatus added in v0.7.0

func (r *UserRepository) ListByStatus(ctx context.Context, status user.Status, limit, offset int) ([]*user.User, error)

ListByStatus retrieves users matching the given status with pagination. Ordered by created_at ascending so the admin UI sees pending users in the order they signed up (oldest first — fairness on the approval queue). `id` breaks ties: created_at is timestamptz to microsecond resolution, but a single INSERT batch can give two rows the exact same timestamp, and without a deterministic tiebreaker LIMIT/OFFSET pagination can drop or duplicate rows across pages.

func (*UserRepository) Save added in v0.7.0

func (r *UserRepository) Save(ctx context.Context, u *user.User) error

Save persists a User aggregate's projection state. New aggregates (LoadedUpdatedAt zero) are inserted; loaded aggregates are updated with optimistic-concurrency check on updated_at.

Successful Save calls SetPersistedState on the aggregate (refreshes loadedUpdatedAt with the value PG actually wrote via RETURNING and bumps the in-memory version) and ClearEvents (per the Repository interface contract — the events are not persisted by this layer today, but discarding them keeps subsequent Save calls from re-publishing in a later PR that wires the audit log subscriber).

type WorkerRepository

type WorkerRepository struct {
	// contains filtered or unexported fields
}

WorkerRepository implements worker.Repository using PostgreSQL.

func NewWorkerRepository

func NewWorkerRepository(pool *pgxpool.Pool) *WorkerRepository

NewWorkerRepository creates a new PostgreSQL-backed worker repository.

func (*WorkerRepository) CleanupStale

func (r *WorkerRepository) CleanupStale(ctx context.Context, threshold time.Duration) (int, error)

func (*WorkerRepository) Delete

func (r *WorkerRepository) Delete(ctx context.Context, id string) error

func (*WorkerRepository) FindAll

func (r *WorkerRepository) FindAll(ctx context.Context) ([]*worker.Worker, error)

func (*WorkerRepository) FindByID

func (r *WorkerRepository) FindByID(ctx context.Context, id string) (*worker.Worker, error)

func (*WorkerRepository) FindForGraph

func (r *WorkerRepository) FindForGraph(ctx context.Context, graphID string, threshold time.Duration) (*worker.Worker, error)

func (*WorkerRepository) FindGraphDefinition

func (r *WorkerRepository) FindGraphDefinition(ctx context.Context, graphID string) (*worker.GraphDefinition, error)

func (*WorkerRepository) FindHealthy

func (r *WorkerRepository) FindHealthy(ctx context.Context, threshold time.Duration) ([]*worker.Worker, error)

func (*WorkerRepository) Heartbeat

func (r *WorkerRepository) Heartbeat(ctx context.Context, id string, status worker.Status, activeRuns, totalRuns, failedRuns int) error

func (*WorkerRepository) Save

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL