migrations

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

README

DynamORM Migrations

This package provides a comprehensive migration system for DynamoDB using DynamORM, supporting schema changes, GSI management, and data migrations with rollback capabilities.

Features

  • Migration Tracking: Stores migration history in DynamoDB
  • Dependency Management: Ensures migrations run in the correct order
  • Rollback Support: Safely rollback migrations with dependency checking
  • GSI Helpers: Utilities for managing Global Secondary Indexes
  • Validation: Pre-execution validation to catch issues early
  • Concurrent Safety: Lock mechanism prevents concurrent migrations
  • Dry Run: Preview changes without executing them

Usage

Creating a Migration
package migrations

import (
    "context"
    "github.com/pay-theory/dynamorm/pkg/core"
)

type AddUserEmailIndex struct {
    BaseMigration
}

func NewAddUserEmailIndex() *AddUserEmailIndex {
    return &AddUserEmailIndex{
        BaseMigration: NewBaseMigration(
            "20240115_add_user_email_index",
            20240115120000,
            "Add GSI for user email lookups",
            // Optional dependencies
        ),
    }
}

func (m *AddUserEmailIndex) Up(ctx context.Context, db core.DB) error {
    // Apply migration logic
    return nil
}

func (m *AddUserEmailIndex) Down(ctx context.Context, db core.DB) error {
    // Rollback logic
    return nil
}
Using GSI Helpers
func NewAddActivityIndex() Migration {
    return NewGSIMigration(
        "20240116_add_activity_index",
        20240116120000,
        "Add GSI for activity queries",
        "lesser-production", // table name
        GSIDefinition{
            Name:           "GSI7",
            HashKey:        "gsi7PK",
            HashKeyType:    "S",
            RangeKey:       "gsi7SK", 
            RangeKeyType:   "S",
            ProjectionType: "ALL",
        },
    )
}
Registering Migrations
func init() {
    // Register migrations in init functions
    MustRegister(NewAddUserEmailIndex())
    MustRegister(NewAddActivityIndex())
}
Running Migrations
// Initialize migrator
db, _ := dynamorm.GetClient(ctx)
registry := GetRegistry()
logger, _ := zap.NewProduction()
migrator := NewMigrator(db, registry, logger)

// Run all pending migrations
err := migrator.MigrateAll(ctx)

// Run up to a specific migration
err := migrator.MigrateTo(ctx, "20240115_add_user_email_index")

// Dry run to preview changes
err := migrator.Migrate(ctx, MigrateOptions{DryRun: true})
Rolling Back Migrations
// Rollback last migration
err := migrator.RollbackLast(ctx)

// Rollback to a specific migration (exclusive)
err := migrator.RollbackTo(ctx, "20240115_add_user_email_index")

// Rollback multiple migrations
err := migrator.RollbackSteps(ctx, 3)
Validation
// Create validator
validator := NewValidator(migrator, logger)

// Validate all pending migrations
result, err := validator.ValidateAll(ctx)
if !result.Valid {
    fmt.Println(result.Format())
}

// Validate rollback plan
result, err := validator.ValidateRollback(ctx, RollbackOptions{Steps: 1})

Migration History

Migration history is stored in DynamoDB with the following structure:

  • PK: MIGRATION#HISTORY
  • SK: Migration ID
  • Attributes: version, description, applied_at, applied_by, checksum, status, error

Best Practices

  1. Naming Convention: Use format YYYYMMDD_description (e.g., 20240115_add_user_index)
  2. Version Numbers: Use format YYYYMMDDHHMMSS for ordering
  3. Dependencies: Only depend on migrations that must run before
  4. Idempotency: Ensure migrations can be safely re-run
  5. Testing: Test both Up and Down methods
  6. Small Changes: Keep migrations focused on single changes
  7. No Data Loss: Ensure Down methods don't cause data loss

Lambda Integration

For Lambda functions that need to run migrations:

func init() {
    // Register models for cold start optimization
    dynamorm.InitializeModels(&MigrationHistory{}, &MigrationStatus{})
}

func handler(ctx context.Context, event any) error {
    // Get Lambda-optimized client
    db, err := dynamorm.GetLambdaClient(ctx)
    if err != nil {
        return err
    }
    
    // Run migrations
    migrator := NewMigrator(db, GetRegistry(), logger)
    if err := migrator.MigrateAll(ctx); err != nil {
        return err
    }
    
    // Continue with handler logic...
}

Testing

Run unit tests:

go test ./pkg/storage/dynamorm/migrations

Run integration tests (requires DynamoDB Local):

go test -tags=integration ./pkg/storage/dynamorm/migrations

Documentation

Overview

Package migrations defines constants and status values for DynamORM database migration management.

Index

Constants

View Source
const (
	StatusActive     = "ACTIVE"
	StatusCurrent    = "CURRENT"
	StatusApplied    = "applied"
	StatusRolledBack = "rolled_back"
	StatusPending    = "pending"
)

Migration status constants

View Source
const (
	EventInsert = "INSERT"
	EventModify = "MODIFY"
	EventRemove = "REMOVE"
)

Event type constants

View Source
const (
	MigrationStatusKey = "MIGRATION#STATUS"
)

Migration-related constants

Variables

This section is empty.

Functions

func MustRegister

func MustRegister(migration Migration)

MustRegister registers a migration to the global registry and panics on error

func Register

func Register(migration Migration) error

Register adds a migration to the global registry

Types

type BaseMigration

type BaseMigration struct {
	// contains filtered or unexported fields
}

BaseMigration provides common functionality for migrations

func NewBaseMigration

func NewBaseMigration(id string, version int64, description string, dependencies ...string) BaseMigration

NewBaseMigration creates a new base migration

func (BaseMigration) Dependencies

func (m BaseMigration) Dependencies() []string

Dependencies returns the migration dependencies

func (BaseMigration) Description

func (m BaseMigration) Description() string

Description returns the migration description

func (BaseMigration) ID

func (m BaseMigration) ID() string

ID returns the migration ID

func (BaseMigration) Version

func (m BaseMigration) Version() int64

Version returns the migration version

type ExampleAddUserLookupIndex

type ExampleAddUserLookupIndex struct {
	BaseMigration
}

ExampleAddUserLookupIndex demonstrates how to create custom migrations

func NewExampleAddUserLookupIndex

func NewExampleAddUserLookupIndex() *ExampleAddUserLookupIndex

NewExampleAddUserLookupIndex creates an example migration

func (*ExampleAddUserLookupIndex) Down

Down reverses the migration

func (*ExampleAddUserLookupIndex) Up

Up applies the migration

type ExampleDependentMigration

type ExampleDependentMigration struct {
	BaseMigration
}

ExampleDependentMigration demonstrates a migration with dependencies

func NewExampleDependentMigration

func NewExampleDependentMigration() *ExampleDependentMigration

NewExampleDependentMigration creates a new example dependent migration

func (*ExampleDependentMigration) Down

Down reverses the migration

func (*ExampleDependentMigration) Up

Up applies the migration

type GSIDefinition

type GSIDefinition struct {
	Name           string
	HashKey        string
	HashKeyType    string // "S", "N", "B"
	RangeKey       string
	RangeKeyType   string // "S", "N", "B"
	ProjectionType string // "ALL", "KEYS_ONLY", "INCLUDE"
	IncludeFields  []string
	ReadCapacity   int64
	WriteCapacity  int64
}

GSIDefinition defines a Global Secondary Index

type GSIHelper

type GSIHelper struct {
	// contains filtered or unexported fields
}

GSIHelper provides utilities for managing Global Secondary Indexes using DynamORM

func NewGSIHelper

func NewGSIHelper(db core.DB, tableName string, logger *zap.Logger) (*GSIHelper, error)

NewGSIHelper creates a new GSI helper using DynamORM

func (*GSIHelper) CreateGSI

func (h *GSIHelper) CreateGSI(ctx context.Context, gsi GSIDefinition) error

CreateGSI creates a new Global Secondary Index using DynamORM patterns

func (*GSIHelper) DeleteGSI

func (h *GSIHelper) DeleteGSI(ctx context.Context, gsiName string) error

DeleteGSI deletes a Global Secondary Index using DynamORM patterns

func (*GSIHelper) GetGSIStatus

func (h *GSIHelper) GetGSIStatus(ctx context.Context, gsiName string) (*GSIMigrationRecord, error)

GetGSIStatus retrieves the status of a GSI migration

func (*GSIHelper) ListGSIMigrations

func (h *GSIHelper) ListGSIMigrations(ctx context.Context) ([]*GSIMigrationRecord, error)

ListGSIMigrations lists all GSI migration records

type GSIMigration

type GSIMigration struct {
	BaseMigration
	TableName string
	GSI       GSIDefinition
}

GSIMigration is a helper base for GSI-related migrations

func NewGSIMigration

func NewGSIMigration(id string, version int64, description string, tableName string, gsi GSIDefinition, dependencies ...string) GSIMigration

NewGSIMigration creates a new GSI migration

func (GSIMigration) Down

func (m GSIMigration) Down(ctx context.Context, db core.DB) error

Down removes the GSI

func (GSIMigration) Up

func (m GSIMigration) Up(ctx context.Context, db core.DB) error

Up creates the GSI

type GSIMigrationRecord

type GSIMigrationRecord struct {
	PK             string    `theorydb:"pk"`
	SK             string    `theorydb:"sk"`
	TableName      string    `json:"table_name"`
	GSIName        string    `json:"gsi_name"`
	HashKey        string    `json:"hash_key"`
	HashKeyType    string    `json:"hash_key_type"`
	RangeKey       string    `json:"range_key,omitempty"`
	RangeKeyType   string    `json:"range_key_type,omitempty"`
	ProjectionType string    `json:"projection_type"`
	IncludeFields  []string  `json:"include_fields,omitempty"`
	ReadCapacity   int64     `json:"read_capacity"`
	WriteCapacity  int64     `json:"write_capacity"`
	Status         string    `json:"status"` // CREATED, DELETED, FAILED
	CreatedAt      time.Time `json:"created_at"`
	Error          string    `json:"error,omitempty"`
}

GSIMigrationRecord tracks GSI migration operations

type MigrateOptions

type MigrateOptions struct {
	// DryRun if true, will only log what would be done without executing
	DryRun bool

	// Target specific migration ID to migrate up to (inclusive)
	Target string

	// Force allows running migrations even if there are validation warnings
	Force bool
}

MigrateOptions contains options for running migrations

type Migration

type Migration interface {
	// ID returns a unique identifier for this migration (e.g., "20240101_add_user_index")
	ID() string

	// Version returns the version number for ordering (e.g., 20240101120000)
	Version() int64

	// Description returns a human-readable description of what this migration does
	Description() string

	// Up applies the migration
	Up(ctx context.Context, db core.DB) error

	// Down reverses the migration
	Down(ctx context.Context, db core.DB) error

	// Dependencies returns a list of migration IDs that must be applied before this one
	Dependencies() []string
}

Migration represents a database schema change

func NewExampleGSIMigration

func NewExampleGSIMigration() Migration

NewExampleGSIMigration creates an example GSI migration

type MigrationHistory

type MigrationHistory struct {
	PK          string    `theorydb:"pk"`
	SK          string    `theorydb:"sk"`
	ID          string    `theorydb:"id"`
	Version     int64     `theorydb:"version"`
	Description string    `theorydb:"description"`
	AppliedAt   time.Time `theorydb:"applied_at"`
	AppliedBy   string    `theorydb:"applied_by"`
	Checksum    string    `theorydb:"checksum"`
	Status      string    `theorydb:"status"` // "applied", "failed", "rolled_back"
	Error       string    `theorydb:"error,omitempty"`
}

MigrationHistory represents a record of an applied migration

func (*MigrationHistory) GetTableKeys

func (m *MigrationHistory) GetTableKeys() (string, string)

GetTableKeys returns the primary key values for DynamoDB

type MigrationStatus

type MigrationStatus struct {
	PK              string    `theorydb:"pk"`
	SK              string    `theorydb:"sk"`
	LastMigrationID string    `theorydb:"last_migration_id"`
	LastVersion     int64     `theorydb:"last_version"`
	UpdatedAt       time.Time `theorydb:"updated_at"`
	IsLocked        bool      `theorydb:"is_locked"`
	LockedBy        string    `theorydb:"locked_by,omitempty"`
	LockedAt        time.Time `theorydb:"locked_at,omitempty"`
}

MigrationStatus represents the current state of migrations

func (*MigrationStatus) GetTableKeys

func (m *MigrationStatus) GetTableKeys() (string, string)

GetTableKeys returns the primary key values for DynamoDB

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator handles migration execution and tracking

func NewMigrator

func NewMigrator(db core.DB, registry *Registry, logger *zap.Logger) *Migrator

NewMigrator creates a new migrator instance

func (*Migrator) GetAppliedMigrations

func (m *Migrator) GetAppliedMigrations(_ context.Context) (map[string]bool, error)

GetAppliedMigrations returns a map of applied migration IDs

func (*Migrator) GetMigrationHistory

func (m *Migrator) GetMigrationHistory(_ context.Context) ([]*MigrationHistory, error)

GetMigrationHistory returns the migration history

func (*Migrator) GetMigrationStatus

func (m *Migrator) GetMigrationStatus(_ context.Context) (*MigrationStatus, error)

GetMigrationStatus returns the current migration status

func (*Migrator) GetPendingMigrations

func (m *Migrator) GetPendingMigrations(ctx context.Context) ([]Migration, error)

GetPendingMigrations returns a list of migrations that haven't been applied

func (*Migrator) GetRollbackPlan

func (m *Migrator) GetRollbackPlan(ctx context.Context, opts RollbackOptions) ([]*MigrationHistory, error)

GetRollbackPlan returns the migrations that would be rolled back

func (*Migrator) Migrate

func (m *Migrator) Migrate(ctx context.Context, opts MigrateOptions) error

Migrate runs all pending migrations up to the target (or all if target is empty)

func (*Migrator) MigrateDown

func (m *Migrator) MigrateDown(ctx context.Context, target string) error

MigrateDown rolls back migrations down to a target version

func (*Migrator) Rollback

func (m *Migrator) Rollback(ctx context.Context) error

Rollback rolls back the last migration

func (*Migrator) RollbackLast

func (m *Migrator) RollbackLast(ctx context.Context) error

RollbackLast rolls back the most recent migration

func (*Migrator) RollbackSteps

func (m *Migrator) RollbackSteps(ctx context.Context, steps int) error

RollbackSteps rolls back the specified number of migrations

func (*Migrator) RollbackTo

func (m *Migrator) RollbackTo(ctx context.Context, targetID string) error

RollbackTo rolls back all migrations after the target (target remains applied)

func (*Migrator) RollbackWithOptions

func (m *Migrator) RollbackWithOptions(ctx context.Context, opts RollbackOptions) error

RollbackWithOptions reverses applied migrations with specified options

func (*Migrator) Run

func (m *Migrator) Run(ctx context.Context) error

Run executes all pending migrations

func (*Migrator) UpdateMigrationStatus

func (m *Migrator) UpdateMigrationStatus(_ context.Context, status *MigrationStatus) error

UpdateMigrationStatus updates the overall migration status

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages available migrations

func GetRegistry

func GetRegistry() *Registry

GetRegistry returns the global registry

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new migration registry

func (*Registry) All

func (r *Registry) All() []Migration

All returns all registered migrations sorted by version

func (*Registry) Get

func (r *Registry) Get(id string) (Migration, bool)

Get returns a migration by ID

func (*Registry) GetInOrder

func (r *Registry) GetInOrder(migrations []Migration) ([]Migration, error)

GetInOrder returns migrations in dependency order

func (*Registry) GetPending

func (r *Registry) GetPending(appliedMigrations map[string]bool) []Migration

GetPending returns migrations that haven't been applied yet

func (*Registry) MustRegister

func (r *Registry) MustRegister(migration Migration)

MustRegister registers a migration and panics on error

func (*Registry) Register

func (r *Registry) Register(migration Migration) error

Register adds a migration to the registry

func (*Registry) ValidateDependencies

func (r *Registry) ValidateDependencies(migration Migration, appliedMigrations map[string]bool) error

ValidateDependencies checks if all dependencies are satisfied

type RollbackOptions

type RollbackOptions struct {
	// DryRun if true, will only log what would be done without executing
	DryRun bool

	// Target specific migration ID to rollback to (exclusive)
	Target string

	// Steps number of migrations to rollback (ignored if Target is set)
	Steps int

	// Force allows rollback even if there are dependency conflicts
	Force bool
}

RollbackOptions contains options for rolling back migrations

type ValidationError

type ValidationError struct {
	MigrationID string
	Type        string
	Message     string
}

ValidationError represents a validation error that prevents migration

type ValidationResult

type ValidationResult struct {
	Valid    bool
	Errors   []ValidationError
	Warnings []ValidationWarning
}

ValidationResult contains the results of migration validation

func (*ValidationResult) Format

func (r *ValidationResult) Format() string

Format formats validation results for display

type ValidationWarning

type ValidationWarning struct {
	MigrationID string
	Type        string
	Message     string
}

ValidationWarning represents a validation warning that doesn't prevent migration

type Validator

type Validator struct {
	// contains filtered or unexported fields
}

Validator validates migrations before execution

func NewValidator

func NewValidator(migrator *Migrator, logger *zap.Logger) *Validator

NewValidator creates a new migration validator

func (*Validator) ValidateAll

func (v *Validator) ValidateAll(ctx context.Context) (*ValidationResult, error)

ValidateAll validates all pending migrations

func (*Validator) ValidateMigration

func (v *Validator) ValidateMigration(ctx context.Context, migrationID string) (*ValidationResult, error)

ValidateMigration validates a specific migration

func (*Validator) ValidateRollback

func (v *Validator) ValidateRollback(ctx context.Context, opts RollbackOptions) (*ValidationResult, error)

ValidateRollback validates that a rollback can be safely performed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL