You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.7 KiB
Go

package migrate
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sort"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
)
// Run applies pending SQL migrations from dir in sorted order.
// Already-applied migrations (tracked in schema_migrations) are skipped.
func Run(ctx context.Context, pool *pgxpool.Pool, dir string) error {
if _, err := pool.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
)`); err != nil {
return fmt.Errorf("migrate: create tracking table: %w", err)
}
entries, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("migrate: read dir %s: %w", dir, err)
}
var files []string
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
files = append(files, e.Name())
}
}
sort.Strings(files)
rows, err := pool.Query(ctx, "SELECT filename FROM schema_migrations")
if err != nil {
return fmt.Errorf("migrate: query applied: %w", err)
}
defer rows.Close()
applied := make(map[string]bool)
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return fmt.Errorf("migrate: scan: %w", err)
}
applied[name] = true
}
if err := rows.Err(); err != nil {
return fmt.Errorf("migrate: rows: %w", err)
}
// If this is the first time the migration runner sees an existing DB
// (tables created by docker-entrypoint-initdb.d), mark bootstrap migration as applied.
if !applied["000001_init.sql"] {
var tableExists bool
_ = pool.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'heroes')").Scan(&tableExists)
if tableExists {
_, _ = pool.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ('000001_init.sql') ON CONFLICT DO NOTHING")
applied["000001_init.sql"] = true
slog.Info("migrate: marked 000001_init.sql as applied (tables already exist)")
}
}
for _, f := range files {
if applied[f] {
continue
}
sql, err := os.ReadFile(filepath.Join(dir, f))
if err != nil {
return fmt.Errorf("migrate: read %s: %w", f, err)
}
tx, err := pool.Begin(ctx)
if err != nil {
return fmt.Errorf("migrate: begin tx for %s: %w", f, err)
}
if _, err := tx.Exec(ctx, string(sql)); err != nil {
tx.Rollback(ctx) //nolint:errcheck
return fmt.Errorf("migrate: exec %s: %w", f, err)
}
if _, err := tx.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ($1)", f); err != nil {
tx.Rollback(ctx) //nolint:errcheck
return fmt.Errorf("migrate: record %s: %w", f, err)
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("migrate: commit %s: %w", f, err)
}
slog.Info("migrate: applied", "file", f)
}
return nil
}