Documentation
¶
Overview ¶
Package postgres — embedded.go
Thin wrapper around fergusstrange/embedded-postgres that spawns a real postgres process as a child of the duragraph engine. Used by the "embedded" binary mode (binary-modes.yml § embedded_components.postgres).
The wrapper exists for three reasons:
- Keep the library's fluent-builder API away from the cmd/serve.go wiring — serve.go consumes a flat EmbeddedConfig struct populated from internal/config, not a chain of method calls.
- Map the config package's plain string version ("15") to the library's typed embeddedpostgres.PostgresVersion. This isolates the heavy library dep from the config package.
- Give us a stable seam for future test substitution (interface extraction is deferred until we actually need it — Phase 4).
Lifecycle expectations (per spec):
- Start blocks until the postgres process is accepting connections.
- Stop sends SIGTERM equivalent and waits for graceful shutdown (the library handles the fsync/checkpoint/exit dance).
- The data directory (cfg.DataDir) is preserved across Start/Stop pairs. Runtime path and binary cache use the library defaults (~/.embedded-postgres-go/) so first start fetches the binary; subsequent starts reuse the cache.
Package postgres — migrator.go ¶
Migrator is the runtime component that owns DB-level provisioning (CREATE DATABASE / DROP DATABASE) and schema rollout (golang-migrate).
It supports three execution modes that map onto the v1.0-platform plan:
Single-DB / drop-in mode (default for existing deployments): `MigrateMainDB(ctx, dbName)` runs the tenant migrations against the engine's primary DB (env DB_NAME, e.g. `appdb`). This preserves the pre-multi-tenant flow where one DB holds everything.
Platform DB (`duragraph_platform`): holds users, tenants, audit log. Created by `Bootstrap` if absent; schema applied via `MigratePlatform`. Singleton, shared across all tenants.
Per-tenant DB (`tenant_<32hex>`): one DB per approved tenant. Created via `ProvisionTenant` (CREATE DATABASE + tenant migrations up). Removed via `DropTenant`.
Idempotency:
- `golang-migrate` maintains a version table per DB; re-running past the current version is a no-op (returns `migrate.ErrNoChange`, which the helpers below swallow).
- `CREATE DATABASE` is wrapped in a `pg_database` existence check (Postgres has no `CREATE DATABASE IF NOT EXISTS`).
- `DROP DATABASE` uses `IF EXISTS`.
Embedding:
- SQL migrations live next to this file (`migrations/tenant/`, `migrations/platform/`) because `go:embed` only reads files inside or below the package directory. The spec convention placed them at `deploy/sql/{tenant,platform}/`; a follow-up update PR to `Duragraph/duragraph-spec` will revise the spec to match.
- The `migrations/platform/` directory is intentionally near-empty today (just a README); platform schema lands in `feat/platform-db-init`. The empty path is detected up front (no `<version>_*.sql` files in the embed.FS) and short-circuited to a no-op — see hasMigrations.
Index ¶
- func Close(pool *pgxpool.Pool)
- func ClosePools(pools *Pools)
- func NewPool(ctx context.Context, config Config) (*pgxpool.Pool, error)
- type AssistantRepository
- func (r *AssistantRepository) Count(ctx context.Context, filters workflow.AssistantSearchFilters) (int, error)
- func (r *AssistantRepository) Delete(ctx context.Context, id string) error
- func (r *AssistantRepository) FindByID(ctx context.Context, id string) (*workflow.Assistant, error)
- func (r *AssistantRepository) FindVersions(ctx context.Context, assistantID string, limit int) ([]workflow.AssistantVersionInfo, error)
- func (r *AssistantRepository) List(ctx context.Context, limit, offset int) ([]*workflow.Assistant, error)
- func (r *AssistantRepository) Save(ctx context.Context, assistant *workflow.Assistant) error
- func (r *AssistantRepository) SaveVersion(ctx context.Context, version workflow.AssistantVersionInfo) error
- func (r *AssistantRepository) Search(ctx context.Context, filters workflow.AssistantSearchFilters) ([]*workflow.Assistant, error)
- func (r *AssistantRepository) SetLatestVersion(ctx context.Context, assistantID string, version int) error
- func (r *AssistantRepository) Update(ctx context.Context, assistant *workflow.Assistant) error
- type CheckpointRepository
- func (r *CheckpointRepository) Delete(ctx context.Context, id string) error
- func (r *CheckpointRepository) FindByCheckpointID(ctx context.Context, threadID, checkpointNS, checkpointID string) (*checkpoint.Checkpoint, error)
- func (r *CheckpointRepository) FindByID(ctx context.Context, id string) (*checkpoint.Checkpoint, error)
- func (r *CheckpointRepository) FindHistory(ctx context.Context, threadID string, checkpointNS string, limit int, ...) ([]*checkpoint.Checkpoint, error)
- func (r *CheckpointRepository) FindLatest(ctx context.Context, threadID string, checkpointNS string) (*checkpoint.Checkpoint, error)
- func (r *CheckpointRepository) FindWritesByCheckpoint(ctx context.Context, threadID, checkpointNS, checkpointID string) ([]*checkpoint.CheckpointWrite, error)
- func (r *CheckpointRepository) Save(ctx context.Context, cp *checkpoint.Checkpoint) error
- func (r *CheckpointRepository) SaveWrite(ctx context.Context, write *checkpoint.CheckpointWrite) error
- type Config
- type CronJob
- type CronRepository
- func (r *CronRepository) Count(ctx context.Context, assistantID, threadID *string) (int, error)
- func (r *CronRepository) Create(ctx context.Context, c *CronJob) (string, error)
- func (r *CronRepository) Delete(ctx context.Context, cronID string) error
- func (r *CronRepository) GetByID(ctx context.Context, cronID string) (*CronJob, error)
- func (r *CronRepository) GetDueJobs(ctx context.Context, now time.Time) ([]CronJob, error)
- func (r *CronRepository) Search(ctx context.Context, assistantID, threadID *string, enabled *bool, ...) ([]CronJob, error)
- func (r *CronRepository) Update(ctx context.Context, cronID string, updates map[string]interface{}) (*CronJob, error)
- func (r *CronRepository) UpdateNextRun(ctx context.Context, cronID string, nextRun time.Time) error
- type EmbeddedConfig
- type EmbeddedPostgres
- type EventStore
- func (s *EventStore) CreateSnapshot(ctx context.Context, streamID, aggregateType, aggregateID string, version int, ...) error
- func (s *EventStore) LoadEvents(ctx context.Context, aggregateType, aggregateID string) ([]map[string]interface{}, error)
- func (s *EventStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (map[string]interface{}, int, error)
- func (s *EventStore) SaveEvents(ctx context.Context, streamID, aggregateType, aggregateID string, ...) error
- func (s *EventStore) SaveEventsInTx(ctx context.Context, tx pgx.Tx, streamID, aggregateType, aggregateID string, ...) error
- type GraphRepository
- func (r *GraphRepository) Delete(ctx context.Context, id string) error
- func (r *GraphRepository) FindByAssistantID(ctx context.Context, assistantID string) ([]*workflow.Graph, error)
- func (r *GraphRepository) FindByAssistantIDAndVersion(ctx context.Context, assistantID, version string) (*workflow.Graph, error)
- func (r *GraphRepository) FindByID(ctx context.Context, id string) (*workflow.Graph, error)
- func (r *GraphRepository) Save(ctx context.Context, graph *workflow.Graph) error
- func (r *GraphRepository) Update(ctx context.Context, graph *workflow.Graph) error
- type InterruptRepository
- func (r *InterruptRepository) Delete(ctx context.Context, id string) error
- func (r *InterruptRepository) FindByID(ctx context.Context, id string) (*humanloop.Interrupt, error)
- func (r *InterruptRepository) FindByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)
- func (r *InterruptRepository) FindUnresolvedByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)
- func (r *InterruptRepository) Save(ctx context.Context, interrupt *humanloop.Interrupt) error
- func (r *InterruptRepository) Update(ctx context.Context, interrupt *humanloop.Interrupt) error
- type Logger
- type Migrator
- func (m *Migrator) Bootstrap(ctx context.Context) error
- func (m *Migrator) DropTenant(ctx context.Context, tenantID string) error
- func (m *Migrator) MigrateAllTenants(ctx context.Context) []TenantMigrationResult
- func (m *Migrator) MigrateMainDB(ctx context.Context, dbName string) error
- func (m *Migrator) MigratePlatform(ctx context.Context) error
- func (m *Migrator) MigrateTenant(ctx context.Context, tenantID string) (uint, error)
- func (m *Migrator) ProvisionTenant(ctx context.Context, tenantID string) error
- func (m *Migrator) ProvisionTenantWithVersion(ctx context.Context, tenantID string) (uint, error)
- type MigratorOption
- type Option
- type Outbox
- func (o *Outbox) Cleanup(ctx context.Context, retentionDays int) (int64, error)
- func (o *Outbox) GetUnpublished(ctx context.Context, limit int) ([]*OutboxMessage, error)
- func (o *Outbox) MarkAsFailed(ctx context.Context, id int64, errorMsg string) error
- func (o *Outbox) MarkAsPublished(ctx context.Context, id int64) error
- type OutboxMessage
- type PoolManager
- type Pools
- type RunRepository
- func (r *RunRepository) Delete(ctx context.Context, id string) error
- func (r *RunRepository) FindActiveByThreadID(ctx context.Context, threadID string) ([]*run.Run, error)
- func (r *RunRepository) FindAll(ctx context.Context, limit, offset int) ([]*run.Run, error)
- func (r *RunRepository) FindByAssistantID(ctx context.Context, assistantID string, limit, offset int) ([]*run.Run, error)
- func (r *RunRepository) FindByID(ctx context.Context, id string) (*run.Run, error)
- func (r *RunRepository) FindByIDConsistent(ctx context.Context, id string) (*run.Run, error)
- func (r *RunRepository) FindByStatus(ctx context.Context, status run.Status, limit, offset int) ([]*run.Run, error)
- func (r *RunRepository) FindByThreadID(ctx context.Context, threadID string, limit, offset int) ([]*run.Run, error)
- func (r *RunRepository) FindExpiredLeases(ctx context.Context) ([]*run.Run, error)
- func (r *RunRepository) LoadFromEvents(ctx context.Context, id string) (*run.Run, error)
- func (r *RunRepository) Save(ctx context.Context, runAgg *run.Run) error
- func (r *RunRepository) Update(ctx context.Context, runAgg *run.Run) error
- type StoreItem
- type StoreRepository
- func (r *StoreRepository) Delete(ctx context.Context, namespace []string, key string) error
- func (r *StoreRepository) Get(ctx context.Context, namespace []string, key string, refreshTTL bool) (*StoreItem, error)
- func (r *StoreRepository) ListNamespaces(ctx context.Context, prefix, suffix []string, maxDepth, limit, offset int) ([][]string, error)
- func (r *StoreRepository) Put(ctx context.Context, namespace []string, key string, ...) error
- func (r *StoreRepository) Search(ctx context.Context, namespacePrefix []string, filter map[string]interface{}, ...) ([]StoreItem, error)
- type TaskAssignmentRepository
- func (r *TaskAssignmentRepository) Claim(ctx context.Context, workerID string, graphIDs []string, ...) ([]*worker.TaskAssignment, error)
- func (r *TaskAssignmentRepository) Complete(ctx context.Context, id int64) error
- func (r *TaskAssignmentRepository) Create(ctx context.Context, task *worker.TaskAssignment) error
- func (r *TaskAssignmentRepository) Fail(ctx context.Context, id int64, errMsg string) error
- func (r *TaskAssignmentRepository) FindByRunID(ctx context.Context, runID string) (*worker.TaskAssignment, error)
- func (r *TaskAssignmentRepository) FindExpiredLeases(ctx context.Context) ([]*worker.TaskAssignment, error)
- func (r *TaskAssignmentRepository) RetryOrFail(ctx context.Context, id int64) error
- type TenantMigrationResult
- type TenantRepository
- func (r *TenantRepository) GetByID(ctx context.Context, id string) (*tenant.Tenant, error)
- func (r *TenantRepository) GetByUserID(ctx context.Context, userID string) (*tenant.Tenant, error)
- func (r *TenantRepository) ListByStatus(ctx context.Context, status tenant.Status, limit, offset int) ([]*tenant.Tenant, error)
- func (r *TenantRepository) Save(ctx context.Context, t *tenant.Tenant) error
- type ThreadRepository
- func (r *ThreadRepository) Count(ctx context.Context, filters workflow.ThreadSearchFilters) (int, error)
- func (r *ThreadRepository) Delete(ctx context.Context, id string) error
- func (r *ThreadRepository) FindByID(ctx context.Context, id string) (*workflow.Thread, error)
- func (r *ThreadRepository) List(ctx context.Context, limit, offset int) ([]*workflow.Thread, error)
- func (r *ThreadRepository) Save(ctx context.Context, thread *workflow.Thread) error
- func (r *ThreadRepository) Search(ctx context.Context, filters workflow.ThreadSearchFilters) ([]*workflow.Thread, error)
- func (r *ThreadRepository) Update(ctx context.Context, thread *workflow.Thread) error
- type UserRepository
- func (r *UserRepository) CountAll(ctx context.Context) (int, error)
- func (r *UserRepository) CountByStatus(ctx context.Context, status *user.Status) (int, error)
- func (r *UserRepository) GetByEmail(ctx context.Context, email string) (*user.User, error)
- func (r *UserRepository) GetByID(ctx context.Context, id string) (*user.User, error)
- func (r *UserRepository) GetByOAuth(ctx context.Context, provider, oauthID string) (*user.User, error)
- func (r *UserRepository) List(ctx context.Context, status *user.Status, limit, offset int) ([]*user.User, error)
- func (r *UserRepository) ListByStatus(ctx context.Context, status user.Status, limit, offset int) ([]*user.User, error)
- func (r *UserRepository) Save(ctx context.Context, u *user.User) error
- type WorkerRepository
- func (r *WorkerRepository) CleanupStale(ctx context.Context, threshold time.Duration) (int, error)
- func (r *WorkerRepository) Delete(ctx context.Context, id string) error
- func (r *WorkerRepository) FindAll(ctx context.Context) ([]*worker.Worker, error)
- func (r *WorkerRepository) FindByID(ctx context.Context, id string) (*worker.Worker, error)
- func (r *WorkerRepository) FindForGraph(ctx context.Context, graphID string, threshold time.Duration) (*worker.Worker, error)
- func (r *WorkerRepository) FindGraphDefinition(ctx context.Context, graphID string) (*worker.GraphDefinition, error)
- func (r *WorkerRepository) FindHealthy(ctx context.Context, threshold time.Duration) ([]*worker.Worker, error)
- func (r *WorkerRepository) Heartbeat(ctx context.Context, id string, status worker.Status, ...) error
- func (r *WorkerRepository) Save(ctx context.Context, w *worker.Worker) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AssistantRepository ¶
type AssistantRepository struct {
// contains filtered or unexported fields
}
AssistantRepository implements the workflow.AssistantRepository interface
func NewAssistantRepository ¶
func NewAssistantRepository(pool *pgxpool.Pool, eventStore *EventStore) *AssistantRepository
NewAssistantRepository creates a new assistant repository
func NewAssistantRepositoryWithPools ¶
func NewAssistantRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *AssistantRepository
NewAssistantRepositoryWithPools creates an assistant repository with separate read/write pools
func (*AssistantRepository) Count ¶
func (r *AssistantRepository) Count(ctx context.Context, filters workflow.AssistantSearchFilters) (int, error)
Count returns the number of assistants matching the given filters
func (*AssistantRepository) Delete ¶
func (r *AssistantRepository) Delete(ctx context.Context, id string) error
Delete removes an assistant
func (*AssistantRepository) FindVersions ¶
func (r *AssistantRepository) FindVersions(ctx context.Context, assistantID string, limit int) ([]workflow.AssistantVersionInfo, error)
FindVersions retrieves version history for an assistant
func (*AssistantRepository) List ¶
func (r *AssistantRepository) List(ctx context.Context, limit, offset int) ([]*workflow.Assistant, error)
List retrieves assistants with pagination
func (*AssistantRepository) SaveVersion ¶
func (r *AssistantRepository) SaveVersion(ctx context.Context, version workflow.AssistantVersionInfo) error
SaveVersion saves a new version of an assistant
func (*AssistantRepository) Search ¶
func (r *AssistantRepository) Search(ctx context.Context, filters workflow.AssistantSearchFilters) ([]*workflow.Assistant, error)
Search retrieves assistants matching the given filters
func (*AssistantRepository) SetLatestVersion ¶
func (r *AssistantRepository) SetLatestVersion(ctx context.Context, assistantID string, version int) error
SetLatestVersion updates the assistant to point to a specific version
type CheckpointRepository ¶
type CheckpointRepository struct {
// contains filtered or unexported fields
}
CheckpointRepository implements checkpoint.Repository using PostgreSQL
func NewCheckpointRepository ¶
func NewCheckpointRepository(pool *pgxpool.Pool) *CheckpointRepository
NewCheckpointRepository creates a new checkpoint repository
func NewCheckpointRepositoryWithPools ¶
func NewCheckpointRepositoryWithPools(writePool, readPool *pgxpool.Pool) *CheckpointRepository
NewCheckpointRepositoryWithPools creates a checkpoint repository with separate read/write pools
func (*CheckpointRepository) Delete ¶
func (r *CheckpointRepository) Delete(ctx context.Context, id string) error
Delete removes a checkpoint
func (*CheckpointRepository) FindByCheckpointID ¶
func (r *CheckpointRepository) FindByCheckpointID(ctx context.Context, threadID, checkpointNS, checkpointID string) (*checkpoint.Checkpoint, error)
FindByCheckpointID retrieves a checkpoint by thread_id, checkpoint_ns, and checkpoint_id
func (*CheckpointRepository) FindByID ¶
func (r *CheckpointRepository) FindByID(ctx context.Context, id string) (*checkpoint.Checkpoint, error)
FindByID retrieves a checkpoint by ID
func (*CheckpointRepository) FindHistory ¶
func (r *CheckpointRepository) FindHistory(ctx context.Context, threadID string, checkpointNS string, limit int, before string) ([]*checkpoint.Checkpoint, error)
FindHistory retrieves checkpoint history for a thread
func (*CheckpointRepository) FindLatest ¶
func (r *CheckpointRepository) FindLatest(ctx context.Context, threadID string, checkpointNS string) (*checkpoint.Checkpoint, error)
FindLatest retrieves the latest checkpoint for a thread
func (*CheckpointRepository) FindWritesByCheckpoint ¶
func (r *CheckpointRepository) FindWritesByCheckpoint(ctx context.Context, threadID, checkpointNS, checkpointID string) ([]*checkpoint.CheckpointWrite, error)
FindWritesByCheckpoint retrieves all writes for a checkpoint
func (*CheckpointRepository) Save ¶
func (r *CheckpointRepository) Save(ctx context.Context, cp *checkpoint.Checkpoint) error
Save persists a checkpoint
func (*CheckpointRepository) SaveWrite ¶
func (r *CheckpointRepository) SaveWrite(ctx context.Context, write *checkpoint.CheckpointWrite) error
SaveWrite persists a checkpoint write
type Config ¶
type Config struct {
Host string
Port int
User string
Password string
Database string
SSLMode string
}
Config holds database configuration
type CronJob ¶
type CronJob struct {
CronID string
AssistantID string
ThreadID *string
Schedule string
Timezone string
Payload map[string]interface{}
Metadata map[string]interface{}
Enabled bool
OnRunCompleted string
EndTime *time.Time
NextRunDate *time.Time
UserID *string
CreatedAt time.Time
UpdatedAt time.Time
}
CronJob represents a scheduled cron job.
type CronRepository ¶
type CronRepository struct {
// contains filtered or unexported fields
}
CronRepository provides CRUD operations for cron jobs.
func NewCronRepository ¶
func NewCronRepository(pool *pgxpool.Pool) *CronRepository
func NewCronRepositoryWithPools ¶
func NewCronRepositoryWithPools(writePool, readPool *pgxpool.Pool) *CronRepository
func (*CronRepository) Delete ¶
func (r *CronRepository) Delete(ctx context.Context, cronID string) error
Delete removes a cron job by ID.
func (*CronRepository) GetDueJobs ¶
GetDueJobs returns enabled cron jobs whose next_run_date has passed.
func (*CronRepository) Search ¶
func (r *CronRepository) Search(ctx context.Context, assistantID, threadID *string, enabled *bool, limit, offset int, sortBy, sortOrder string) ([]CronJob, error)
Search finds cron jobs matching optional filters with pagination.
func (*CronRepository) Update ¶
func (r *CronRepository) Update(ctx context.Context, cronID string, updates map[string]interface{}) (*CronJob, error)
Update patches a cron job. Only non-nil fields are updated.
func (*CronRepository) UpdateNextRun ¶
UpdateNextRun sets the next_run_date for a cron job.
type EmbeddedConfig ¶ added in v0.7.0
type EmbeddedConfig struct {
Port uint32
DataDir string // persistent data path, NOT the runtime path
Username string
Password string
Database string
Version string // postgres major version string, e.g. "15"
Logger io.Writer
// StartTimeout caps how long Start() waits for the postgres process
// to become healthy. Zero means "use library default" (15s, which is
// enough for cached binaries but tight for a first-run download on
// slow links). Operators bump via DB_EMBEDDED_START_TIMEOUT
// (Go duration syntax, e.g. "90s") — wired in internal/config.Load
// and plumbed through serve.go.
StartTimeout time.Duration
}
EmbeddedConfig is the flat config struct populated by serve.go from internal/config.DatabaseConfig. Fields mirror the library's relevant builder methods 1:1.
type EmbeddedPostgres ¶ added in v0.7.0
type EmbeddedPostgres struct {
// contains filtered or unexported fields
}
EmbeddedPostgres wraps the library's *EmbeddedPostgres so we can give the surrounding code a context-friendly Start/Stop signature without fighting the library's blocking calls.
func NewEmbedded ¶ added in v0.7.0
func NewEmbedded(cfg EmbeddedConfig) (*EmbeddedPostgres, error)
NewEmbedded constructs an EmbeddedPostgres ready to Start. We create cfg.DataDir up front (with restrictive 0o700 perms — postgres initdb rejects looser permissions on a pre-existing data dir) so the downstream library does not have to recurse parent paths itself.
Library quirk: on first run the library calls os.RemoveAll(dataPath) then runs initdb, which creates the dir fresh. On subsequent runs (reused dir) the library does NOT touch perms, so an operator who upgraded from a pre-fix build with a 0o755 data dir would still fail initdb's permission check — the chmod step below is the only thing that recovers them.
func (*EmbeddedPostgres) Config ¶ added in v0.7.0
func (e *EmbeddedPostgres) Config() EmbeddedConfig
Config returns the resolved EmbeddedConfig. Mostly useful for tests and for the loud-startup logging in serve.go.
func (*EmbeddedPostgres) Start ¶ added in v0.7.0
func (e *EmbeddedPostgres) Start(ctx context.Context) error
Start launches the embedded postgres process and blocks until it is accepting connections. The ctx parameter is currently honoured only for symmetry with the rest of the codebase — the library's own Start() does not take a context. If the caller's ctx is already done when Start is invoked we short-circuit with ctx.Err().
func (*EmbeddedPostgres) Stop ¶ added in v0.7.0
func (e *EmbeddedPostgres) Stop(ctx context.Context) error
Stop gracefully terminates the embedded postgres process. The library's Stop() blocks until pg_ctl reports a clean shutdown (fsync + checkpoint + exit). The ctx parameter is honoured the same way as in Start — short-circuit if already cancelled.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore implements event sourcing storage
func NewEventStore ¶
func NewEventStore(pool *pgxpool.Pool) *EventStore
NewEventStore creates a new event store
func (*EventStore) CreateSnapshot ¶
func (s *EventStore) CreateSnapshot(ctx context.Context, streamID, aggregateType, aggregateID string, version int, state map[string]interface{}) error
CreateSnapshot creates a snapshot of aggregate state
func (*EventStore) LoadEvents ¶
func (s *EventStore) LoadEvents(ctx context.Context, aggregateType, aggregateID string) ([]map[string]interface{}, error)
LoadEvents loads all events for an aggregate
func (*EventStore) LoadSnapshot ¶
func (s *EventStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (map[string]interface{}, int, error)
LoadSnapshot loads the latest snapshot for an aggregate
func (*EventStore) SaveEvents ¶
func (s *EventStore) SaveEvents(ctx context.Context, streamID, aggregateType, aggregateID string, events []eventbus.Event) error
SaveEvents saves events to the event store and outbox in its own transaction.
type GraphRepository ¶
type GraphRepository struct {
// contains filtered or unexported fields
}
GraphRepository implements the workflow.GraphRepository interface
func NewGraphRepository ¶
func NewGraphRepository(pool *pgxpool.Pool, eventStore *EventStore) *GraphRepository
NewGraphRepository creates a new graph repository
func NewGraphRepositoryWithPools ¶
func NewGraphRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *GraphRepository
NewGraphRepositoryWithPools creates a graph repository with separate read/write pools
func (*GraphRepository) Delete ¶
func (r *GraphRepository) Delete(ctx context.Context, id string) error
Delete removes a graph
func (*GraphRepository) FindByAssistantID ¶
func (r *GraphRepository) FindByAssistantID(ctx context.Context, assistantID string) ([]*workflow.Graph, error)
FindByAssistantID retrieves graphs for a specific assistant
func (*GraphRepository) FindByAssistantIDAndVersion ¶
func (r *GraphRepository) FindByAssistantIDAndVersion(ctx context.Context, assistantID, version string) (*workflow.Graph, error)
FindByAssistantIDAndVersion retrieves a specific graph version
type InterruptRepository ¶
type InterruptRepository struct {
// contains filtered or unexported fields
}
InterruptRepository implements the humanloop.Repository interface
func NewInterruptRepository ¶
func NewInterruptRepository(pool *pgxpool.Pool, eventStore *EventStore) *InterruptRepository
NewInterruptRepository creates a new interrupt repository
func NewInterruptRepositoryWithPools ¶
func NewInterruptRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *InterruptRepository
NewInterruptRepositoryWithPools creates an interrupt repository with separate read/write pools
func (*InterruptRepository) Delete ¶
func (r *InterruptRepository) Delete(ctx context.Context, id string) error
Delete removes an interrupt
func (*InterruptRepository) FindByID ¶
func (r *InterruptRepository) FindByID(ctx context.Context, id string) (*humanloop.Interrupt, error)
FindByID retrieves an interrupt by ID
func (*InterruptRepository) FindByRunID ¶
func (r *InterruptRepository) FindByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)
FindByRunID retrieves interrupts for a specific run
func (*InterruptRepository) FindUnresolvedByRunID ¶
func (r *InterruptRepository) FindUnresolvedByRunID(ctx context.Context, runID string) ([]*humanloop.Interrupt, error)
FindUnresolvedByRunID retrieves unresolved interrupts for a run
type Logger ¶ added in v0.7.0
Logger is the minimal logging surface the migrator needs. Both `*log.Logger` and `log.Default()` satisfy it, so the engine's stdlib-`log` style integrates without extra glue.
type Migrator ¶ added in v0.7.0
type Migrator struct {
// contains filtered or unexported fields
}
Migrator owns DB provisioning + schema rollout.
A single Migrator is safe for concurrent use across goroutines; the helpers each open short-lived admin connections and close them before returning. There is no long-lived shared pool inside the migrator, because migrations run only at startup + on tenant approval — both rare events.
func NewMigrator ¶ added in v0.7.0
func NewMigrator(pgAdminDSN string, opts ...MigratorOption) (*Migrator, error)
NewMigrator constructs a Migrator. pgAdminDSN must be a postgres URL of the form `postgres://user:pass@host:port/<anydb>?sslmode=...`. The DB component is overwritten per call; "postgres" or "appdb" both work as the placeholder.
Returns an error only if pgAdminDSN cannot be parsed as a URL.
func (*Migrator) Bootstrap ¶ added in v0.7.0
Bootstrap ensures the platform DB exists and runs platform migrations. Safe to call repeatedly. Returns nil on success even when the platform migrations directory is empty (handled inside MigratePlatform).
func (*Migrator) DropTenant ¶ added in v0.7.0
DropTenant drops the tenant DB. Safe to call when the DB doesn't exist (uses DROP DATABASE IF EXISTS). Active connections to the DB will block the drop in Postgres ≥13; callers should ensure the pgxpool for this tenant has been closed before invoking.
func (*Migrator) MigrateAllTenants ¶ added in v0.7.0
func (m *Migrator) MigrateAllTenants(ctx context.Context) []TenantMigrationResult
MigrateAllTenants iterates approved tenants from `platform.tenants` and runs MigrateTenant against each. Bounded concurrency (5 in flight).
Graceful empty paths:
- If the platform DB doesn't exist (SQLSTATE 3D000) → empty result, warning logged. This fires when MIGRATOR_PLATFORM_ENABLED=false and Bootstrap has never been called.
- If `platform.tenants` doesn't exist (SQLSTATE 42P01) → empty result, warning logged. This fires before `feat/platform-db-init` adds the platform schema.
Per-tenant migration failures are surfaced as Err on individual results; this method itself returns no error.
func (*Migrator) MigrateMainDB ¶ added in v0.7.0
MigrateMainDB runs the tenant migrations against the engine's primary DB. Used during the transition before multi-tenant routing lands; this preserves the existing single-DB dev/test/prod flow that previously relied on docker-entrypoint-initdb.d.
func (*Migrator) MigratePlatform ¶ added in v0.7.0
MigratePlatform runs platform migrations against the platform DB. Assumes the DB exists. Empty migration set short-circuits to a no-op; ErrNoChange (already at head) is treated as success.
func (*Migrator) MigrateTenant ¶ added in v0.7.0
MigrateTenant runs tenant migrations against an existing tenant DB and returns the resulting version. ErrNoChange (already at head) is treated as success; the version reflects the post-migration state.
func (*Migrator) ProvisionTenant ¶ added in v0.7.0
ProvisionTenant creates the tenant DB if absent and applies all tenant migrations. Safe to call repeatedly: existing DB → skip create; existing schema → ErrNoChange swallowed.
tenantID must be a UUID string; the DB name is derived via tenant.DBName (which validates the UUID).
Discards the resulting schema version; callers that need it should use ProvisionTenantWithVersion instead (avoids a second migrate.Up() round-trip just to read the version table).
func (*Migrator) ProvisionTenantWithVersion ¶ added in v0.7.0
ProvisionTenantWithVersion is ProvisionTenant that also returns the resulting schema version, avoiding a second MigrateTenant call just to read the version. Used by the tenant_provisioner subscriber where the version is recorded on tenant.Approve.
type MigratorOption ¶ added in v0.7.0
type MigratorOption func(*Migrator)
MigratorOption configures a Migrator.
func WithLogger ¶ added in v0.7.0
func WithLogger(l Logger) MigratorOption
WithLogger overrides the default logger (log.Default()).
func WithPlatformDBName ¶ added in v0.7.0
func WithPlatformDBName(name string) MigratorOption
WithPlatformDBName overrides the default platform DB name. Default is "duragraph_platform".
type Option ¶ added in v0.7.0
type Option func(*PoolManager)
Option configures a PoolManager.
func WithEvictInterval ¶ added in v0.7.0
WithEvictInterval sets how often the eviction goroutine scans for idle pools. Default is 1 minute.
func WithIdleTimeout ¶ added in v0.7.0
WithIdleTimeout sets how long a tenant pool may sit idle before being evicted. Default is 10 minutes.
func WithMaxConns ¶ added in v0.7.0
WithMaxConns sets the per-tenant pgxpool MaxConns. Default is 2.
type Outbox ¶
type Outbox struct {
// contains filtered or unexported fields
}
Outbox implements the outbox pattern for reliable event publishing
func (*Outbox) GetUnpublished ¶
GetUnpublished retrieves unpublished messages from the outbox. Uses FOR UPDATE SKIP LOCKED so multiple relay instances don't process the same messages.
func (*Outbox) MarkAsFailed ¶
MarkAsFailed marks a message as failed and schedules retry
type OutboxMessage ¶
type OutboxMessage struct {
ID int64
EventID string
AggregateType string
AggregateID string
EventType string
Payload map[string]interface{}
Metadata map[string]interface{}
CreatedAt time.Time
Published bool
PublishedAt *time.Time
Attempts int
LastError *string
NextRetryAt *time.Time
}
OutboxMessage represents a message in the outbox
type PoolManager ¶ added in v0.7.0
type PoolManager struct {
// contains filtered or unexported fields
}
PoolManager owns one *pgxpool.Pool per tenant id, lazily created on first ForTenant call and reaped after idle. All tenants share a baseConfig (host/port/credentials/SSL); only `Database` and `MaxConns` are mutated per tenant onto a copy.
The manager is safe for concurrent use. Eviction runs in a single background goroutine, started by Start and stopped by Close (or parent ctx cancellation).
func NewPoolManager ¶ added in v0.7.0
func NewPoolManager(baseConfig *pgxpool.Config, opts ...Option) *PoolManager
NewPoolManager constructs a PoolManager. The caller is responsible for parsing the base DSN via `pgxpool.ParseConfig`; this matches the pattern in db.go where the application pre-parses config and passes a ready-to-use *pgxpool.Config. The Database field on baseConfig is ignored (overwritten per tenant).
func (*PoolManager) Close ¶ added in v0.7.0
func (m *PoolManager) Close() error
Close stops eviction, closes every cached pool, and clears the map. Safe to call from any goroutine; subsequent ForTenant calls return an error. Always returns nil today; the signature returns error so future implementations can surface partial-close failures without a breaking change.
func (*PoolManager) ForTenant ¶ added in v0.7.0
ForTenant returns the pgxpool for the given tenant, creating it on first request. tenantID is a UUID string ("xxxxxxxx-xxxx-..."); it is parsed and re-rendered to the canonical lowercase form before derivation.
func (*PoolManager) Start ¶ added in v0.7.0
func (m *PoolManager) Start(ctx context.Context)
Start launches the idle-eviction goroutine. Calling Start more than once is a no-op. Eviction stops when ctx is cancelled or Close is called.
type RunRepository ¶
type RunRepository struct {
// contains filtered or unexported fields
}
RunRepository implements the run.Repository interface
func NewRunRepository ¶
func NewRunRepository(pool *pgxpool.Pool, eventStore *EventStore) *RunRepository
NewRunRepository creates a new run repository
func NewRunRepositoryWithPools ¶
func NewRunRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *RunRepository
NewRunRepositoryWithPools creates a run repository with separate read/write pools
func (*RunRepository) Delete ¶
func (r *RunRepository) Delete(ctx context.Context, id string) error
Delete removes a run
func (*RunRepository) FindActiveByThreadID ¶
func (r *RunRepository) FindActiveByThreadID(ctx context.Context, threadID string) ([]*run.Run, error)
FindActiveByThreadID retrieves active (non-terminal) runs for a thread
func (*RunRepository) FindByAssistantID ¶
func (r *RunRepository) FindByAssistantID(ctx context.Context, assistantID string, limit, offset int) ([]*run.Run, error)
FindByAssistantID retrieves runs for a specific assistant
func (*RunRepository) FindByIDConsistent ¶
FindByIDConsistent reads from write pool for strong consistency after writes
func (*RunRepository) FindByStatus ¶
func (r *RunRepository) FindByStatus(ctx context.Context, status run.Status, limit, offset int) ([]*run.Run, error)
FindByStatus retrieves runs by status
func (*RunRepository) FindByThreadID ¶
func (r *RunRepository) FindByThreadID(ctx context.Context, threadID string, limit, offset int) ([]*run.Run, error)
FindByThreadID retrieves runs for a specific thread
func (*RunRepository) FindExpiredLeases ¶
FindExpiredLeases finds runs with expired worker leases. Uses FOR UPDATE SKIP LOCKED to prevent multiple instances from processing the same expired lease.
func (*RunRepository) LoadFromEvents ¶
LoadFromEvents rebuilds a run from event store
type StoreItem ¶
type StoreItem struct {
Namespace []string
Key string
Value map[string]interface{}
CreatedAt time.Time
UpdatedAt time.Time
}
StoreItem represents a namespaced key-value item.
type StoreRepository ¶
type StoreRepository struct {
// contains filtered or unexported fields
}
StoreRepository provides CRUD operations for the namespaced key-value store.
func NewStoreRepository ¶
func NewStoreRepository(pool *pgxpool.Pool) *StoreRepository
func NewStoreRepositoryWithPools ¶
func NewStoreRepositoryWithPools(writePool, readPool *pgxpool.Pool) *StoreRepository
func (*StoreRepository) Get ¶
func (r *StoreRepository) Get(ctx context.Context, namespace []string, key string, refreshTTL bool) (*StoreItem, error)
Get retrieves a single item by namespace and key. Returns nil if not found or expired.
func (*StoreRepository) ListNamespaces ¶
func (r *StoreRepository) ListNamespaces(ctx context.Context, prefix, suffix []string, maxDepth, limit, offset int) ([][]string, error)
ListNamespaces returns distinct namespaces matching optional prefix/suffix and depth constraints.
type TaskAssignmentRepository ¶
type TaskAssignmentRepository struct {
// contains filtered or unexported fields
}
TaskAssignmentRepository implements worker.TaskRepository using PostgreSQL.
func NewTaskAssignmentRepository ¶
func NewTaskAssignmentRepository(pool *pgxpool.Pool) *TaskAssignmentRepository
NewTaskAssignmentRepository creates a new task assignment repository.
func (*TaskAssignmentRepository) Claim ¶
func (r *TaskAssignmentRepository) Claim(ctx context.Context, workerID string, graphIDs []string, leaseDuration time.Duration, maxTasks int) ([]*worker.TaskAssignment, error)
Claim atomically claims pending tasks for a worker using FOR UPDATE SKIP LOCKED.
func (*TaskAssignmentRepository) Complete ¶
func (r *TaskAssignmentRepository) Complete(ctx context.Context, id int64) error
func (*TaskAssignmentRepository) Create ¶
func (r *TaskAssignmentRepository) Create(ctx context.Context, task *worker.TaskAssignment) error
func (*TaskAssignmentRepository) FindByRunID ¶
func (r *TaskAssignmentRepository) FindByRunID(ctx context.Context, runID string) (*worker.TaskAssignment, error)
func (*TaskAssignmentRepository) FindExpiredLeases ¶
func (r *TaskAssignmentRepository) FindExpiredLeases(ctx context.Context) ([]*worker.TaskAssignment, error)
func (*TaskAssignmentRepository) RetryOrFail ¶
func (r *TaskAssignmentRepository) RetryOrFail(ctx context.Context, id int64) error
RetryOrFail requeues a task if retries remain, otherwise marks it failed.
type TenantMigrationResult ¶ added in v0.7.0
TenantMigrationResult captures the outcome of MigrateAllTenants for a single tenant. Err is nil on success.
type TenantRepository ¶ added in v0.7.0
type TenantRepository struct {
// contains filtered or unexported fields
}
TenantRepository persists the Tenant aggregate against the platform DB (`platform.tenants`). All queries are schema-qualified per the platform schema convention (PR #151).
Pure projection writer — no event store / outbox interaction. The audit-log mirror lives in a later PR.
Optimistic concurrency token: updated_at (no version column on the table). Same approach as UserRepository — see that file for the full rationale.
func NewTenantRepository ¶ added in v0.7.0
func NewTenantRepository(pool *pgxpool.Pool) *TenantRepository
NewTenantRepository constructs a TenantRepository against the given platform DB pool. The pool must be connected to the platform DB (schema-qualified queries against `platform.tenants` fail with `42P01 undefined_table` otherwise).
func (*TenantRepository) GetByID ¶ added in v0.7.0
GetByID retrieves a tenant by ID. Returns errors.NotFound when no row matches.
func (*TenantRepository) GetByUserID ¶ added in v0.7.0
GetByUserID retrieves the tenant owned by the given user. The 1:1 relationship is enforced by tenants_user_id_unique at the schema level, so at most one row matches. Returns errors.NotFound when the user has no tenant yet.
func (*TenantRepository) ListByStatus ¶ added in v0.7.0
func (r *TenantRepository) ListByStatus(ctx context.Context, status tenant.Status, limit, offset int) ([]*tenant.Tenant, error)
ListByStatus retrieves tenants in a particular status with pagination. Ordered by created_at ascending — same fairness convention as UserRepository.ListByStatus. `id` is appended as a deterministic tiebreaker so LIMIT/OFFSET pagination is stable when two tenants share a created_at timestamp (microsecond collisions on batch inserts would otherwise let pages drop or duplicate rows).
func (*TenantRepository) Save ¶ added in v0.7.0
Save persists a Tenant aggregate's projection state. New aggregates (LoadedUpdatedAt zero) are inserted; loaded aggregates are updated with optimistic-concurrency check on updated_at.
The table-level CHECK constraints (tenants_db_name_derived_from_id, tenants_approved_requires_schema_version, tenants_approved_requires_provisioned_at, tenants_failure_reason_only_when_failed) propagate through unchanged — Save does not attempt to suppress them; they are the schema layer's last line of defense and a validation bug surface.
Successful Save calls SetPersistedState (refreshes loadedUpdatedAt from the RETURNING row) and ClearEvents.
type ThreadRepository ¶
type ThreadRepository struct {
// contains filtered or unexported fields
}
ThreadRepository implements the workflow.ThreadRepository interface
func NewThreadRepository ¶
func NewThreadRepository(pool *pgxpool.Pool, eventStore *EventStore) *ThreadRepository
NewThreadRepository creates a new thread repository
func NewThreadRepositoryWithPools ¶
func NewThreadRepositoryWithPools(writePool, readPool *pgxpool.Pool, eventStore *EventStore) *ThreadRepository
NewThreadRepositoryWithPools creates a thread repository with separate read/write pools
func (*ThreadRepository) Count ¶
func (r *ThreadRepository) Count(ctx context.Context, filters workflow.ThreadSearchFilters) (int, error)
Count returns the number of threads matching the given filters
func (*ThreadRepository) Delete ¶
func (r *ThreadRepository) Delete(ctx context.Context, id string) error
Delete removes a thread
func (*ThreadRepository) Search ¶
func (r *ThreadRepository) Search(ctx context.Context, filters workflow.ThreadSearchFilters) ([]*workflow.Thread, error)
Search retrieves threads matching the given filters
type UserRepository ¶ added in v0.7.0
type UserRepository struct {
// contains filtered or unexported fields
}
UserRepository persists the User aggregate against the platform DB (`platform.users`). All queries are schema-qualified per the platform schema convention introduced in PR #151 — the migrator queries `platform.users` schema-qualified, so this repository must do the same.
This is a pure projection writer: the User aggregate's uncommitted events (Events() / ClearEvents()) are NOT persisted by this repository. The audit-log projection that mirrors those events into `platform.audit_log` is delivered via a NATS subscriber in a later PR (Wave 2). Save() therefore never writes to the event store or the outbox.
Optimistic concurrency: the platform.users table has no `version` column, so `updated_at` is the OCC token. ReconstructFromData captures the column value as the aggregate's LoadedUpdatedAt(); Save() compares against it on UPDATE and returns errors.ErrConcurrency on a stale token. The fresh-vs-loaded discrimination is `LoadedUpdatedAt().IsZero()`.
func NewUserRepository ¶ added in v0.7.0
func NewUserRepository(pool *pgxpool.Pool) *UserRepository
NewUserRepository constructs a UserRepository against the given platform DB pool. The pool must be connected to the platform DB (the singleton `duragraph_platform` in production, or a per-test platform DB in integration tests) — the schema-qualified queries fail with `42P01 undefined_table` if pointed at any other DB.
func (*UserRepository) CountAll ¶ added in v0.7.0
func (r *UserRepository) CountAll(ctx context.Context) (int, error)
CountAll returns the total number of users in the platform DB. Used by the OAuth callback to detect the bootstrap-first-user branch (count==0 ⇒ auto-elevate to admin per auth/oauth.yml).
func (*UserRepository) CountByStatus ¶ added in v0.7.0
CountByStatus returns the number of users matching the given status, or all users when status is nil. Used by the admin handler to populate AdminUserListResponse.total independently of pagination.
func (*UserRepository) GetByEmail ¶ added in v0.7.3
GetByEmail retrieves a user by email, case-insensitive. Backed by the idx_users_lower_email functional index added in migration 005, so the lookup is O(log n) on a busy platform.users.
Used by the password-login flow (auth/password.yml § endpoints.login): the client submits {email, password}, we lookup the user by email, then VerifyPassword against the stored bcrypt hash. The case-insensitive match is intentional — addresses are case-insensitive per RFC 5321 and users routinely re-type their address with different casing across signup vs. login. Pre-005 schema relied on byte-for-byte UNIQUE(email) for OAuth callbacks (where the provider always returns the same canonical address) so case-insensitive lookup is a password-flow addition, not a behavior change for OAuth.
Returns errors.NotFound when no row matches.
func (*UserRepository) GetByID ¶ added in v0.7.0
GetByID retrieves a user by aggregate ID. Returns errors.NotFound when no row matches.
func (*UserRepository) GetByOAuth ¶ added in v0.7.0
func (r *UserRepository) GetByOAuth(ctx context.Context, provider, oauthID string) (*user.User, error)
GetByOAuth retrieves a user by the immutable (oauth_provider, oauth_id) pair. Returns errors.NotFound when no row matches.
func (*UserRepository) List ¶ added in v0.7.0
func (r *UserRepository) List(ctx context.Context, status *user.Status, limit, offset int) ([]*user.User, error)
List retrieves users with optional status filter and pagination. Mirrors ListByStatus's ORDER BY (created_at ASC, id ASC) so the admin UI sees the same fairness ordering — and the same stable pagination — whether it scopes by status or not. The `id` tiebreaker is required: without it, two rows that share a created_at timestamp can be reordered across LIMIT/OFFSET pages, leaving rows duplicated or skipped.
A nil status applies no filter. Branching at the SQL layer rather than building dynamic WHERE clauses keeps query plans cacheable on the PG side.
func (*UserRepository) ListByStatus ¶ added in v0.7.0
func (r *UserRepository) ListByStatus(ctx context.Context, status user.Status, limit, offset int) ([]*user.User, error)
ListByStatus retrieves users matching the given status with pagination. Ordered by created_at ascending so the admin UI sees pending users in the order they signed up (oldest first — fairness on the approval queue). `id` breaks ties: created_at is timestamptz to microsecond resolution, but a single INSERT batch can give two rows the exact same timestamp, and without a deterministic tiebreaker LIMIT/OFFSET pagination can drop or duplicate rows across pages.
func (*UserRepository) Save ¶ added in v0.7.0
Save persists a User aggregate's projection state. New aggregates (LoadedUpdatedAt zero) are inserted; loaded aggregates are updated with optimistic-concurrency check on updated_at.
Successful Save calls SetPersistedState on the aggregate (refreshes loadedUpdatedAt with the value PG actually wrote via RETURNING and bumps the in-memory version) and ClearEvents (per the Repository interface contract — the events are not persisted by this layer today, but discarding them keeps subsequent Save calls from re-publishing in a later PR that wires the audit log subscriber).
type WorkerRepository ¶
type WorkerRepository struct {
// contains filtered or unexported fields
}
WorkerRepository implements worker.Repository using PostgreSQL.
func NewWorkerRepository ¶
func NewWorkerRepository(pool *pgxpool.Pool) *WorkerRepository
NewWorkerRepository creates a new PostgreSQL-backed worker repository.
func (*WorkerRepository) CleanupStale ¶
func (*WorkerRepository) Delete ¶
func (r *WorkerRepository) Delete(ctx context.Context, id string) error
func (*WorkerRepository) FindForGraph ¶
func (*WorkerRepository) FindGraphDefinition ¶
func (r *WorkerRepository) FindGraphDefinition(ctx context.Context, graphID string) (*worker.GraphDefinition, error)
func (*WorkerRepository) FindHealthy ¶
Source Files
¶
- assistant_repository.go
- checkpoint_repository.go
- cron_repository.go
- db.go
- embedded.go
- event_store.go
- graph_repository.go
- interrupt_repository.go
- migrator.go
- outbox.go
- pool_manager.go
- run_repository.go
- store_repository.go
- task_assignment_repository.go
- tenant_repository.go
- thread_repository.go
- user_repository.go
- worker_repository.go