package migrate import ( "context" "fmt" "log/slog" "os" "path/filepath" "sort" "strings" "github.com/jackc/pgx/v5/pgxpool" ) // Run applies pending SQL migrations from dir in sorted order. // Already-applied migrations (tracked in schema_migrations) are skipped. func Run(ctx context.Context, pool *pgxpool.Pool, dir string) error { if _, err := pool.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_migrations ( filename TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT now() )`); err != nil { return fmt.Errorf("migrate: create tracking table: %w", err) } entries, err := os.ReadDir(dir) if err != nil { return fmt.Errorf("migrate: read dir %s: %w", dir, err) } var files []string for _, e := range entries { if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { files = append(files, e.Name()) } } sort.Strings(files) rows, err := pool.Query(ctx, "SELECT filename FROM schema_migrations") if err != nil { return fmt.Errorf("migrate: query applied: %w", err) } defer rows.Close() applied := make(map[string]bool) for rows.Next() { var name string if err := rows.Scan(&name); err != nil { return fmt.Errorf("migrate: scan: %w", err) } applied[name] = true } if err := rows.Err(); err != nil { return fmt.Errorf("migrate: rows: %w", err) } // If this is the first time the migration runner sees an existing DB // (tables created by docker-entrypoint-initdb.d), mark bootstrap migration as applied. if !applied["000001_init.sql"] { var tableExists bool _ = pool.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'heroes')").Scan(&tableExists) if tableExists { _, _ = pool.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ('000001_init.sql') ON CONFLICT DO NOTHING") applied["000001_init.sql"] = true slog.Info("migrate: marked 000001_init.sql as applied (tables already exist)") } } for _, f := range files { if applied[f] { continue } sql, err := os.ReadFile(filepath.Join(dir, f)) if err != nil { return fmt.Errorf("migrate: read %s: %w", f, err) } tx, err := pool.Begin(ctx) if err != nil { return fmt.Errorf("migrate: begin tx for %s: %w", f, err) } if _, err := tx.Exec(ctx, string(sql)); err != nil { tx.Rollback(ctx) //nolint:errcheck return fmt.Errorf("migrate: exec %s: %w", f, err) } if _, err := tx.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ($1)", f); err != nil { tx.Rollback(ctx) //nolint:errcheck return fmt.Errorf("migrate: record %s: %w", f, err) } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("migrate: commit %s: %w", f, err) } slog.Info("migrate: applied", "file", f) } return nil }