migration

package
v0.1.41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 21 Imported by: 0

README

Migration Package

Orchestrates migrations: MigrationManager, domain type Migration, DuckDB (pkg/db), and queues (pkg/queue).

Higher-level API / HTTP integration (routes, background tasks, runtime cache) lives in the Sylos application that imports this module; this package is the engine surface.


Documentation in this repository

  • docs/algorithms.md, docs/item_statuses.md – Supplemental reference.
  • pkg/db/README.md, pkg/queue/README.md – Persistence and queue behavior.

Note: Some older docs referred to docs/ENGINE_ARCHITECTURE_OVERVIEW.md and similar files. Those paths are not present in this repo; they may live in another Sylos repository or be retired—prefer the package READMEs above.


Config and entry points

Config (migration.go)

Used by LetsMigrate / StartMigration:

  • DatabaseDatabaseConfig (Path, RemoveExisting, RequireOpen).
    • Non-empty Path: legacy single DuckDB file (all migrations in that DB’s migrations table, if used).
    • Empty Path: per-migration DB files; the host passes MigrationDir / migrationDir into CreateMigration / GetMigration (see manager.go).
  • Source, DestinationService with types.FSAdapter and types.Folder. Adapters must be non-nil; the engine does not construct or close them.
  • SeedRoots, WorkerCount, MaxRetries, CoordinatorLead, LogAddress, LogLevel, SkipListener, StartupDelay, ProgressTick, Verification, ShutdownContext.
LetsMigrate(cfg)
  1. Builds MigrationManager from cfg.Database.
  2. CreateMigration with metadata derived from config.
  3. If SeedRoots, seeds root tasks into the migration DB.
  4. StartTraversal(cfg) – runs traversal (and does not run copy in this path; see domain methods for copy).
  5. Runs VerifyMigration unless suspended by shutdown.

Blocks until completion, error, or shutdown. Handles SIGINT/SIGTERM when ShutdownContext is not preset.

StartMigration(cfg)

Runs LetsMigrate in a goroutine. MigrationController exposes only:

  • Shutdown() – cancel shutdown context
  • Done() <-chan struct{}
  • Wait() (Result, error)

There is no GetDB() / Result() / Error() on the controller in the current API.


MigrationManager (manager.go)

  • NewMigrationManager(DatabaseConfig) – Opens legacy DB when Path is set; otherwise returns a manager that opens {dir}/{id}.db per migration.
  • CreateMigration, GetMigration, ListMigrations, GetMigrationDetails, DeleteMigration, Close.

GetMigrationDetails / ListMigrations merge Live and Phase from an in-memory Migration when this process has loaded it (so live matches running); otherwise phase comes from the DB row.


Domain Migration (domain.go, phase.go, …)

Phases (string constants)

Examples: roots-set, filters-set, traversal-in-progress, awaiting-traversal-review, copy-in-progress, awaiting-copy-review (phase.go).

Common methods
  • AddRoots, StartTraversal(cfg), StartCopy(cfg) – require live FS adapters in cfg; UpdateConfig persists root_config_json. StartCopy may be called again while phase is copy-in-progress to resume after a crash; RunCopyPhase rescans pending depths and may enable a one-round dst existence precheck when events show both successful and pending copy work (copy.go).
  • RunRetrySweep(cfg, opts), PrepareRetrySweep() – For async HTTP: call PrepareRetrySweep() synchronously before returning 202, then run RunRetrySweep with the same cfg shape as traversal (adapters + roots) in a background task.
  • RunCopyRetry(cfg, opts), PrepareCopyRetry() – Same pattern for copy retry when exposed asynchronously.
  • UpdateConfig(cfg) – persists root_config_json only (serializable fields); callers still pass cfg with adapters for each run.
  • Review helpers: query nodes, path review, exclude, mark retry, etc.
  • Stop() – stop result with current phase / runtime snapshot.

Status inspection

InspectMigrationStatus(database *db.DB) (status.go):

  • Totals from node tables.
  • Pending / failed from status events (GetTraversalStatusCountsFromEvents), not from stats tables alone—so counts stay correct if per-depth stats lag.

Retry sweep (summary)

Engine retry sweep re-processes pending/failed traversal work (pkg/queue retry mode). DST cleanup on SRC folder completion is described in pkg/queue/README.md.

Automated scenario: pkg/tests/traversal/retry_sweep/ (see pkg/tests/README.md).


SetupDatabase (database.go)

database, wasFresh, err := migration.SetupDatabase(migration.DatabaseConfig{
    Path:           "migration.duckdb",
    RemoveExisting: false,
})

Opens DuckDB with default seal-buffer options and ensures node + status-event indexes. Caller closes the DB when appropriate.


Verification

VerifyMigration(database, VerifyOptions) – reads node/event-derived state and produces VerificationReport.


Best practices

  1. Own adapters in the host process; keep them open for the duration of a run.
  2. Async retry sweep / copy retry: Prepare* then background Run* (see above).
  3. Per-migration DBs: pass consistent migrationDir when loading details/listing so the manager can open the right file.

Examples

  • main.go (repo root) – CLI-style LetsMigrate.
  • pkg/tests/traversal/*, pkg/tests/copy/* – Spectra-backed runners.

Documentation

Index

Constants

View Source
const (
	PhaseCreated         string = "roots-set"                 // Initial state; after user sets roots. TODO: when filter creation module lands, transition to filters-set before traversal.
	PhaseFiltersSet      string = "filters-set"               // Ready for traversal
	PhaseTraversing      string = "traversal-in-progress"     // Traversal running
	PhaseTraversalReview string = "awaiting-traversal-review" // Traversal done, user can review; can retry traversal or start copy
	PhaseCopying         string = "copy-in-progress"          // Copy phase running
	PhaseCopyReview      string = "awaiting-copy-review"      // Copy done, user can review; can retry copy
)

Phase is the migration lifecycle state, stored as the canonical status string (lowercase-with-hyphens).

View Source
const (
	DeltaTraversalPending      = "traversalPending"
	DeltaTraversalPendingRetry = "traversalPendingRetry"
	DeltaTraversalFailed       = "traversalFailed"
	DeltaCopyPending           = "copyPending"
	DeltaCopyFailed            = "copyFailed"
	DeltaCopySuccessful        = "copySuccessful"
	DeltaExcluded              = "excluded"
	DeltaFolders               = "folders"
	DeltaFiles                 = "files"
	DeltaSizeSrc               = "sizeSrc"
	DeltaSizeDst               = "sizeDst"
)

Canonical delta keys for PathReviewActionResult.Deltas. Only keys that changed (non-zero) are included.

View Source
const (
	FSCredentialRoleSource      = "source"
	FSCredentialRoleDestination = "destination"
)

FS credential roles (match roots.SetRootRequest.Role).

Variables

This section is empty.

Functions

func HandleShutdownSignals

func HandleShutdownSignals(cancel context.CancelFunc)

HandleShutdownSignals sets up signal handlers for SIGINT (Ctrl+C) and SIGTERM. When either signal is received, it cancels the provided context. If shutdown doesn't complete within 10 seconds, it hard-kills the process. This function should be called in a goroutine at the start of the migration. On Windows, SIGINT is supported but SIGTERM may not be available.

func MigrationDBPath added in v0.1.41

func MigrationDBPath(migrationDir, migrationID string) string

MigrationDBPath returns the per-migration DB path when the API passes the folder for that migration. migrationDir is the absolute path to the migration's folder (e.g. data/migration-123); the DB file is migrationDir/{migrationID}.db.

func ParsePhase

func ParsePhase(v string) (string, error)

ParsePhase parses a phase string from the DB. Accepts new lowercase-with-hyphens values and legacy PascalCase values.

func RunCopyPhase

func RunCopyPhase(cfg CopyPhaseConfig) (queue.QueueStats, error)

RunCopyPhase executes the copy phase (two-pass: folders then files).

func RunCopyRetryPhase added in v0.1.41

func RunCopyRetryPhase(cfg CopyPhaseConfig) (queue.QueueStats, error)

RunCopyRetryPhase runs the copy phase in retry mode: only copy_status = failed items are pulled. Uses the same two-pass BFS and max-depth guarded completion as traversal retry.

func SetupDatabase

func SetupDatabase(cfg DatabaseConfig) (*db.DB, bool, error)

SetupDatabase opens a DuckDB database at cfg.Path. Returns the DB and whether it was fresh (true if new or removed). The caller is responsible for closing the database when done.

Types

type Config

type Config struct {
	// Database config (path, etc.). MigrationManager opens and owns this connection lifecycle.
	Database DatabaseConfig

	Source      Service
	Destination Service

	SeedRoots       bool
	WorkerCount     int
	MaxRetries      int
	CoordinatorLead int

	LogAddress   string
	LogLevel     string
	SkipListener bool
	StartupDelay time.Duration
	ProgressTick time.Duration

	Verification VerifyOptions

	// ShutdownContext is an optional context for force shutdown control.
	// If not provided, LetsMigrate will create one internally.
	// Set this when using StartMigration for programmatic shutdown control.
	ShutdownContext context.Context
}

Config aggregates all of the knobs required to run the migration engine once.

func (*Config) SetRootFolders

func (c *Config) SetRootFolders(src, dst types.Folder) error

SetRootFolders assigns the source and destination root folders that will seed the migration queues. It normalizes required defaults (location path, type, display name) and validates identifiers.

type CopyPhaseConfig

type CopyPhaseConfig struct {
	DuckDB          *db.DB
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	WorkerCount     int
	MaxRetries      int
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ShutdownContext context.Context
}

CopyPhaseConfig configures the copy phase execution.

type CopyPhaseOptions added in v0.1.41

type CopyPhaseOptions struct {
	WorkerCount  int
	MaxRetries   int
	LogAddress   string
	LogLevel     string
	SkipListener bool
}

CopyPhaseOptions are optional overrides for copy phase / copy retry runs. Zero values use last run config.

type CopyStatusCounts added in v0.1.41

type CopyStatusCounts struct {
	Pending    int
	Successful int
	Failed     int
	Skipped    int
}

CopyStatusCounts holds copy status counts for the API (e.g. copyStatusCounts response).

type CreateMigrationConfig

type CreateMigrationConfig struct {
	Name            string
	ServiceMetadata any
	RootConfig      any
	// MigrationDir is the absolute path to the folder for this migration (e.g. data/{id}). If set, the DB is created there immediately; MigrationID is optional (engine generates if empty).
	MigrationDir string
	// MigrationID is optional; when set with MigrationDir, this ID is used (API pre-generated ID and created the folder).
	MigrationID string
}

CreateMigrationConfig defines metadata persisted for a migration record. When MigrationDir is set, the engine creates the DB at MigrationDir/{id}.db immediately. When MigrationDir is empty, the engine returns a migration with a generated ID and no DB yet; the API creates the folder, then passes that path to GetMigration so the engine can create/open the DB when first needed.

type DatabaseConfig

type DatabaseConfig struct {
	// Path is the DuckDB file path for legacy single-DB mode (all migrations in one file). When empty, the manager uses per-migration DBs; the API passes each migration's folder path to CreateMigration (MigrationDir) and GetMigration (migrationDir).
	Path string
	// RemoveExisting deletes the database file if it already exists before creating a new database.
	RemoveExisting bool
	// RequireOpen determines whether the DB instance must already be open (true) or can be auto-opened (false).
	// When true (API mode): DB instance must be provided and already open, error if nil/closed.
	// When false (standalone mode): Can auto-open DB if instance is nil or not open.
	RequireOpen bool
}

DatabaseConfig defines how the migration engine should prepare its backing store.

type DiffItem

type DiffItem struct {
	Path               string
	Name               string
	Depth              int
	Type               string
	SrcNodeID          string
	DstNodeID          string
	SrcTraversalStatus string
	DstTraversalStatus string
	CopyStatus         string
	Excluded           bool
	MissingOnSource    bool
	MissingOnDest      bool
	Size               int64
}

DiffItem is a path review row comparing source and destination state.

type DiffsStats

type DiffsStats struct {
	Total           int
	Folders         int
	Files           int
	MissingOnSource int
	MissingOnDest   int
	Excluded        int
}

type FSCredentialBinding added in v0.1.41

type FSCredentialBinding struct {
	Role             string
	ConnectionID     string
	CredsConfRelPath string
	ServiceID        string
	RootFolderJSON   string
}

FSCredentialBinding is one row from fs_credential_binding.

type ListChildrenDiffsRequest

type ListChildrenDiffsRequest struct {
	Path          string
	Limit         int
	Offset        int
	SortBy        string
	SortDirection string
	FoldersOnly   bool
	Status        string
}

type ListChildrenDiffsResult

type ListChildrenDiffsResult struct {
	Items  []DiffItem
	Total  int
	Limit  int
	Offset int
}

type LogEntry

type LogEntry struct {
	Timestamp time.Time
	Level     string
	Message   string
}

LogEntry is a user-facing runtime log projection.

type LogsProjection

type LogsProjection struct {
	Entries []LogEntry
	ByLevel map[string][]LogEntry
}

type Migration

type Migration struct {
	ID   string
	Name string
	DB   *db.DB // this migration's DB (per-migration or shared in legacy mode)
	// contains filtered or unexported fields
}

Migration is the first-class domain object for a single migration lifecycle.

func (*Migration) AddRoots

func (m *Migration) AddRoots(srcRoot, dstRoot types.Folder) (RootSeedSummary, error)

AddRoots seeds source and destination root tasks into the migration DB. This is the explicit root insert step before starting traversal.

func (*Migration) BulkExclude

func (m *Migration) BulkExclude(filter NodeQueryFilter, excluded bool) (PathReviewActionResult, error)

BulkExclude applies exclusion over a query slice.

func (*Migration) BulkExcludeWithPropagation

func (m *Migration) BulkExcludeWithPropagation(filter NodeQueryFilter, excluded bool) (PathReviewActionResult, error)

func (*Migration) EnsureEnvelopeMasterKey added in v0.1.41

func (m *Migration) EnsureEnvelopeMasterKey() ([]byte, error)

EnsureEnvelopeMasterKey returns the 32-byte Sylos-FS envelope master key, generating and persisting it if absent.

func (*Migration) GetChildrenDiffsStats

func (m *Migration) GetChildrenDiffsStats(path string, foldersOnly bool) (DiffsStats, error)

func (*Migration) GetEnvelopeMasterKey added in v0.1.41

func (m *Migration) GetEnvelopeMasterKey() ([]byte, error)

GetEnvelopeMasterKey returns the persisted envelope master key.

func (*Migration) GetFSCredentialBinding added in v0.1.41

func (m *Migration) GetFSCredentialBinding(role string) (*FSCredentialBinding, error)

GetFSCredentialBinding returns a binding by role ("source" or "destination"), or an error if missing.

func (*Migration) GetLogs

func (m *Migration) GetLogs(limit int, groupByLevel bool) (LogsProjection, error)

func (*Migration) GetPathReviewStats added in v0.1.41

func (m *Migration) GetPathReviewStats() PathReviewStats

GetPathReviewStats returns phase-aware review stats for API passthrough. Reads the canonical stats table; falls back to live recomputation only when the table is empty.

func (*Migration) GetQueueMetrics

func (m *Migration) GetQueueMetrics() (QueueMetricsSnapshot, error)

func (*Migration) GetRecentLogs

func (m *Migration) GetRecentLogs(limit int) ([]LogEntry, error)

func (*Migration) GetRuntimeStatus

func (m *Migration) GetRuntimeStatus() RuntimeState

func (*Migration) GetSearchStats added in v0.1.41

func (m *Migration) GetSearchStats(req SearchRequest) (DiffsStats, error)

GetSearchStats returns aggregate counts for the same filter as SearchPathReviewItems (query, path, status, foldersOnly).

func (*Migration) GetTraversalSummary

func (m *Migration) GetTraversalSummary() (TraversalSummary, error)

func (*Migration) IsLive added in v0.1.41

func (m *Migration) IsLive() bool

IsLive returns true when a run is active (traversal, copy, or retry). Distinct from lifecycle phase; use for "in progress / paused" indicator.

func (*Migration) ListChildrenDiffs

func (m *Migration) ListChildrenDiffs(req ListChildrenDiffsRequest) (ListChildrenDiffsResult, error)

func (*Migration) ListFSCredentialBindings added in v0.1.41

func (m *Migration) ListFSCredentialBindings() ([]FSCredentialBinding, error)

ListFSCredentialBindings returns all persisted FS credential bindings for this migration DB.

func (*Migration) MarkNodeForRetryCopy

func (m *Migration) MarkNodeForRetryCopy(nodeID string) (PathReviewActionResult, error)

func (*Migration) MarkNodeForRetryDiscovery

func (m *Migration) MarkNodeForRetryDiscovery(nodeID string) (PathReviewActionResult, error)

func (*Migration) Phase

func (m *Migration) Phase() string

func (*Migration) PrepareCopyRetry added in v0.1.41

func (m *Migration) PrepareCopyRetry() error

PrepareCopyRetry transitions to copy-in-progress and persists phase immediately. Call synchronously before returning 202 and starting RunCopyRetry in a background task, same pattern as PrepareRetrySweep.

func (*Migration) PrepareRetrySweep added in v0.1.41

func (m *Migration) PrepareRetrySweep() error

PrepareRetrySweep transitions to traversal-in-progress and persists phase immediately. Call this synchronously in the HTTP handler before returning 202 and starting RunRetrySweep in a background task, so clients that poll GET migration see traversal-in-progress before the sweep goroutine runs.

func (*Migration) QueryNodes

func (m *Migration) QueryNodes(filter NodeQueryFilter) ([]db.NodeState, error)

QueryNodes provides review-phase node search/filter without exposing SQL to API.

func (*Migration) RetryAllFailed

func (m *Migration) RetryAllFailed() (PathReviewActionResult, error)

func (*Migration) RunCopyRetry added in v0.1.41

func (m *Migration) RunCopyRetry(cfg Config, opts CopyPhaseOptions) (queue.QueueStats, error)

RunCopyRetry runs the copy phase in retry mode (only copy_status = failed). Requires awaiting-copy-review. On success transitions back to awaiting-copy-review.

func (*Migration) RunRetrySweep

func (m *Migration) RunRetrySweep(cfg Config, opts RetrySweepOptions) (RuntimeStats, error)

func (*Migration) SearchPathReviewItems

func (m *Migration) SearchPathReviewItems(req SearchRequest) (SearchResult, error)

func (*Migration) SetNodeExcluded

func (m *Migration) SetNodeExcluded(queueType, nodeID string, excluded bool) (PathReviewActionResult, error)

SetNodeExcluded mutates review exclusions in engine-owned store.

func (*Migration) SetNodeExcludedWithPropagation

func (m *Migration) SetNodeExcludedWithPropagation(queueType, nodeID string, excluded bool) (PathReviewActionResult, error)

func (*Migration) StartCopy

func (m *Migration) StartCopy(cfg Config) (queue.QueueStats, error)

StartCopy transitions review->copying and runs copy phase. cfg must include live source/destination adapters (same as StartTraversal).

func (*Migration) StartTraversal

func (m *Migration) StartTraversal(cfg Config) (RuntimeStats, error)

StartTraversal begins traversal lifecycle and transitions to awaiting-traversal-review on success. Requires filters-set.

func (*Migration) Stop

func (m *Migration) Stop() (StopResult, error)

func (*Migration) UnmarkNodeForRetryCopy

func (m *Migration) UnmarkNodeForRetryCopy(nodeID string) (PathReviewActionResult, error)

func (*Migration) UnmarkNodeForRetryDiscovery

func (m *Migration) UnmarkNodeForRetryDiscovery(nodeID string) (PathReviewActionResult, error)

func (*Migration) UpdateConfig added in v0.1.41

func (m *Migration) UpdateConfig(cfg Config) error

UpdateConfig writes a JSON snapshot of cfg (roots, worker knobs, verification; not FS adapters) to migrations.root_config_json.

func (*Migration) UpsertFSCredentialBinding added in v0.1.41

func (m *Migration) UpsertFSCredentialBinding(binding FSCredentialBinding) error

UpsertFSCredentialBinding persists one side's FS credential binding (connection id, optional creds path, service id, serialized root folder).

type MigrationConfig

type MigrationConfig struct {
	DB              *db.DB // DuckDB instance (required; manager-owned)
	DBPath          string // Reserved for compatibility; ignored when DB is set
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	SrcRoot         types.Folder
	DstRoot         types.Folder
	SrcServiceName  string
	WorkerCount     int
	MaxRetries      int
	CoordinatorLead int
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ResumeStatus    *MigrationStatus
	ShutdownContext context.Context
}

MigrationConfig is the configuration passed to RunMigration.

type MigrationController

type MigrationController struct {
	// contains filtered or unexported fields
}

MigrationController provides programmatic control over a running migration. It allows you to trigger force shutdown and check migration status.

func StartMigration

func StartMigration(cfg Config) *MigrationController

StartMigration starts a migration asynchronously and returns a MigrationController that allows programmatic shutdown. Use this when you need to control the migration lifecycle or run migrations in the background.

Example:

controller := migration.StartMigration(cfg)
defer controller.Shutdown()

// Later, trigger shutdown programmatically:
controller.Shutdown()

// Wait for completion:
result, err := controller.Wait()

func (*MigrationController) Done

func (mc *MigrationController) Done() <-chan struct{}

Done returns a channel that is closed when the migration completes or is shutdown.

func (*MigrationController) Shutdown

func (mc *MigrationController) Shutdown()

Shutdown triggers a force shutdown of the migration. This is safe to call multiple times or after the migration has completed.

func (*MigrationController) Wait

func (mc *MigrationController) Wait() (Result, error)

Wait blocks until the migration completes or is shutdown, then returns the result and error.

type MigrationDetails

type MigrationDetails struct {
	ID                  string
	Name                string
	Phase               string // From DB; merged with in-memory phase when this process has the migration loaded (see Live).
	CreatedAt           time.Time
	UpdatedAt           time.Time
	ServiceMetadataJSON string
	RootConfigJSON      string
	Live                bool // true when a run is active (traversal, copy, or retry); merged from in-memory Migration when loaded
}

MigrationDetails is the full migration record from the DB, for API detail views. The engine owns all DB access; use this type via GetMigrationDetails so the API never touches the DB.

type MigrationManager

type MigrationManager struct {
	// contains filtered or unexported fields
}

MigrationManager owns migration lifecycle authority and persistence access. Either Path is set (legacy single DB for all migrations) or the API passes the migration folder path per migration (CreateMigration with MigrationDir, or GetMigration(id, migrationDir)).

func NewMigrationManager

func NewMigrationManager(cfg DatabaseConfig) (*MigrationManager, error)

NewMigrationManager opens the migration DB (legacy single-DB when Path is set) or creates a manager that uses per-migration paths (when Path is empty and the API will pass MigrationDir / migrationDir per call).

func (*MigrationManager) Close

func (m *MigrationManager) Close() error

Close releases manager-owned resources (single DB in legacy mode, or all open per-migration DBs).

func (*MigrationManager) CreateMigration

func (m *MigrationManager) CreateMigration(cfg CreateMigrationConfig) (*Migration, error)

CreateMigration registers a migration and returns a domain object. The API can either: 1) Pass MigrationDir (path to the migration's folder): engine creates the DB there and returns the migration; MigrationID is optional (engine generates if empty). 2) Omit MigrationDir: engine generates an ID and returns a migration with no DB yet; the API creates the folder (e.g. data/{id}), then calls GetMigration(id, migrationDir) so the engine creates the DB when first needed.

func (*MigrationManager) DeleteMigration

func (m *MigrationManager) DeleteMigration(id string, migrationDir string) error

DeleteMigration removes the migration from memory and, when a DB exists, deletes its record. When using per-migration DBs, pass migrationDir so the engine can open the DB, delete the row, and close it; otherwise only in-memory state is removed.

func (*MigrationManager) GetMigration

func (m *MigrationManager) GetMigration(id string, migrationDir string) (*Migration, error)

GetMigration loads or returns a cached migration. When using per-migration DBs, pass the absolute path to that migration's folder (e.g. data/{id}); the engine creates or opens the DB at migrationDir/id.db. When migrationDir is empty, uses the legacy single DB (Path must have been set when creating the manager).

func (*MigrationManager) GetMigrationDetails

func (m *MigrationManager) GetMigrationDetails(id string, migrationDir string) (*MigrationDetails, error)

GetMigrationDetails returns the full migration record. When using per-migration DBs, pass migrationDir (path to that migration's folder) if the migration is not already loaded in memory.

func (*MigrationManager) ListMigrations

func (m *MigrationManager) ListMigrations(dataDir string) ([]MigrationSummary, error)

ListMigrations returns migration summaries. When using a single DB (Path set), dataDir is ignored. When using per-migration DBs, pass dataDir to scan for migration folders (e.g. data); each subdir dataDir/{id} is expected to contain {id}.db. When dataDir is empty and no single DB, returns only in-memory migrations (current process).

type MigrationStatus

type MigrationStatus struct {
	SrcTotal int
	DstTotal int

	SrcPending int
	DstPending int

	SrcFailed int
	DstFailed int

	MinPendingDepthSrc *int
	MinPendingDepthDst *int
}

MigrationStatus summarizes the current state of a migration in the database.

func InspectMigrationStatus

func InspectMigrationStatus(database *db.DB) (MigrationStatus, error)

InspectMigrationStatus inspects the DuckDB node tables and status_events (current state) and returns a MigrationStatus. Pending/failed counts are derived from status_events (arg_max per id), not from src_stats/dst_stats, so counts are correct even when stats tables were not populated (e.g. DB from API or other tooling).

func (MigrationStatus) HasFailures

func (s MigrationStatus) HasFailures() bool

HasFailures returns true if any src or dst nodes failed traversal.

func (MigrationStatus) HasPending

func (s MigrationStatus) HasPending() bool

HasPending returns true if any src or dst nodes are still pending.

func (MigrationStatus) IsComplete

func (s MigrationStatus) IsComplete() bool

IsComplete returns true when there are nodes and no pending or failed work.

func (MigrationStatus) IsEmpty

func (s MigrationStatus) IsEmpty() bool

IsEmpty returns true if no nodes have been discovered yet.

type MigrationSummary

type MigrationSummary struct {
	ID        string
	Name      string
	Phase     string // From DB; merged with in-memory phase when this process has the migration loaded (see Live).
	CreatedAt time.Time
	UpdatedAt time.Time
	Live      bool // true when a run is active (traversal, copy, or retry); merged from in-memory Migration when loaded
}

MigrationSummary is a compact projection returned by ListMigrations.

type NodeQueryFilter

type NodeQueryFilter struct {
	Queue       string
	Depth       *int
	Status      string
	Excluded    *bool
	PathLike    string
	Limit       int
	Offset      int
	OrderByPath bool
}

NodeQueryFilter controls review-phase node query behavior.

type PathReviewActionResult added in v0.1.41

type PathReviewActionResult struct {
	AffectedCount int64
	Deltas        map[string]int64
}

PathReviewActionResult is the result of a path review mutation. Deltas holds per-status/category changes (only non-zero keys). UI applies them to the matching counter; phase determines which counters are shown.

type PathReviewSearchCondition added in v0.1.41

type PathReviewSearchCondition struct {
	Field    string `json:"field"`
	Operator string `json:"operator,omitempty"`
	Value    any    `json:"value"`
}

PathReviewSearchCondition mirrors API/UI search filters (field names lowercase in JSON).

type PathReviewStats added in v0.1.41

type PathReviewStats struct {
	PendingCount        int
	FailedCount         int
	ExcludedCount       int
	PendingRetriesCount int
	FoldersCount        int
	FilesCount          int
	FoldersRatio        float64
	FilesRatio          float64
	TotalFileSize       struct {
		Src int64
		Dst int64
	}
}

PathReviewStats is the UI/API-facing review stats shape. pendingCount, failedCount, and pendingRetriesCount are phase-aware.

type QueueMetricsSnapshot

type QueueMetricsSnapshot struct {
	Queues map[string]map[string]any
}

type Result

type Result struct {
	RootsSeeded  bool
	RootSummary  RootSeedSummary
	Runtime      RuntimeStats
	Verification VerificationReport
}

Result captures the outcome of a migration run.

func LetsMigrate

func LetsMigrate(cfg Config) (Result, error)

LetsMigrate executes setup, traversal, and verification using the supplied configuration. This is the synchronous version - it blocks until the migration completes or is shutdown. For programmatic shutdown control, use StartMigration instead.

type RetrySweepOptions

type RetrySweepOptions struct {
	WorkerCount   int
	MaxRetries    int
	LogAddress    string
	LogLevel      string
	SkipListener  bool
	MaxKnownDepth int
}

RetrySweepOptions are manager-level knobs for retry sweep runs.

type ReviewStatsRaw added in v0.1.41

type ReviewStatsRaw struct {
	TraversalPending      int64
	TraversalPendingRetry int64
	TraversalFailed       int64
	CopyPending           int64
	CopyFailed            int64
	Excluded              int64
	Folders               int64
	Files                 int64
	SizeSrc               int64
	SizeDst               int64
}

ReviewStatsRaw is the canonical persisted counters in the universal stats table (key -> count). Used for cache and delta updates; PathReviewStats is derived from this plus phase.

func ReviewStatsRawFromSnapshot added in v0.1.41

func ReviewStatsRawFromSnapshot(s db.ReviewStatsSnapshot) ReviewStatsRaw

ReviewStatsRawFromSnapshot converts the DB snapshot to the migration-layer raw stats (e.g. for seeding the in-memory cache).

func (ReviewStatsRaw) ToPathReviewStats added in v0.1.41

func (r ReviewStatsRaw) ToPathReviewStats(phase string) PathReviewStats

ToPathReviewStats projects raw stats into the API shape using phase.

type RootSeedSummary

type RootSeedSummary struct {
	SrcRoots int
	DstRoots int
}

RootSeedSummary captures verification details after root task seeding.

func SeedRootTasks

func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) (RootSeedSummary, error)

SeedRootTasks inserts the supplied source and destination root folders into the database. The folders should already contain root-relative metadata (LocationPath="/", DepthLevel=0).

type RuntimeState

type RuntimeState struct {
	NodesDiscovered int64
	TasksPending    int64
	TasksCompleted  int64
	BytesCopied     int64
	Errors          int64
}

RuntimeState is the in-memory live status projection for a migration.

type RuntimeStats

type RuntimeStats struct {
	Duration time.Duration
	Src      queue.QueueStats
	Dst      queue.QueueStats
}

RuntimeStats captures execution statistics at the end of a migration run.

func RunMigration

func RunMigration(cfg MigrationConfig) (RuntimeStats, error)

RunMigration executes the migration traversal using the provided configuration.

func RunRetrySweep

func RunRetrySweep(cfg SweepConfig) (RuntimeStats, error)

RunRetrySweep runs a retry sweep to re-process failed or pending tasks from a previous traversal. This allows re-traversing paths that previously failed (e.g., due to permissions) and discovering new content in those paths.

The sweep checks all known levels up to maxKnownDepth (or auto-detects from DB if -1), then uses normal traversal logic for deeper levels discovered during retry.

Example:

config := migration.SweepConfig{
    DuckDB:        dbInstance,
    SrcAdapter:    srcAdapter,
    DstAdapter:    dstAdapter,
    WorkerCount:   10,
    MaxRetries:    3,
    MaxKnownDepth: 5, // Or -1 to auto-detect
}
stats, err := migration.RunRetrySweep(config)

type SearchRequest

type SearchRequest struct {
	Query         string
	Path          string // empty = global search over all review paths
	Limit         int
	Offset        int
	SortBy        string
	SortDirection string
	FoldersOnly   bool
	Status        string // legacy: OR across src/dst traversal + copy_status

	// Structured search (preferred). When non-empty, Status is ignored.
	Conditions       []PathReviewSearchCondition `json:"conditions,omitempty"`
	StatusSearchType string                      `json:"statusSearchType,omitempty"` // traversal, copy, both
}

type SearchResult

type SearchResult struct {
	Items  []DiffItem
	Total  int
	Limit  int
	Offset int
}

type Service

type Service struct {
	Name    string
	Adapter types.FSAdapter
	Root    types.Folder
}

Service defines a single filesystem service participating in a migration.

type StopResult

type StopResult struct {
	MigrationID   string
	Phase         string
	RuntimeStatus RuntimeState
	Stopped       bool
}

StopResult reports stop/suspend state after a stop request.

type SweepConfig

type SweepConfig struct {
	DuckDB          *db.DB
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	WorkerCount     int
	MaxRetries      int
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ShutdownContext context.Context
	// For retry sweeps only
	MaxKnownDepth          int    // Maximum known depth from previous traversal (-1 to auto-detect)
	SkipAutoETLBeforeRetry bool   // If true, skip automatic ETL from DuckDB to DuckDB before retry sweep
	DuckDBPath             string // Optional: Path to DuckDB file (auto-derived from DuckDB path if empty and ETL is enabled)
}

Gotta, sweep sweep sweep!!! 🧹🧹🧹 SweepConfig is the configuration for running retry sweeps.

type TraversalSummary

type TraversalSummary struct {
	SrcTotal    int
	DstTotal    int
	SrcPending  int
	DstPending  int
	SrcFailed   int
	DstFailed   int
	SrcExcluded int
	DstExcluded int
	// CopyStatusCounts are SRC-only counts by copy_status (pending, successful, failed, skipped) from status_events.
	CopyStatusCounts CopyStatusCounts
	// Merged review totals (one row per path in merged view; use for API foldersCount, filesCount, excludedCount).
	FoldersCount     int
	FilesCount       int
	ExcludedCount    int
	TotalFileSizeSrc int64
	TotalFileSizeDst int64
	FoldersRatio     float64 // FoldersCount / total, rounded to 2 decimals
	FilesRatio       float64 // FilesCount / total, rounded to 2 decimals
}

TraversalSummary is the review projection returned by GetTraversalSummary.

type VerificationReport

type VerificationReport struct {
	SrcTotal      int
	DstTotal      int
	SrcPending    int
	DstPending    int
	SrcFailed     int
	DstFailed     int
	DstNotOnSrc   int
	SrcSuccessful int // Count of successful SRC nodes
	DstSuccessful int // Count of successful DST nodes
}

VerificationReport captures aggregate statistics from the verification pass.

func VerifyMigration

func VerifyMigration(database *db.DB, opts VerifyOptions) (VerificationReport, error)

VerifyMigration inspects the DuckDB node tables and stats for pending, failed, or missing nodes and returns a report.

func (VerificationReport) Success

func (r VerificationReport) Success(opts VerifyOptions) bool

Success returns true when the report satisfies the supplied VerifyOptions. Migration is not considered successful unless at least one node was actually moved/traversed.

type VerifyOptions

type VerifyOptions struct {
	AllowPending  bool
	AllowNotOnSrc bool
}

VerifyOptions define the expectations for post-migration validation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL