You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102 lines
2.7 KiB
Go
102 lines
2.7 KiB
Go
package migrate
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// Run applies pending SQL migrations from dir in sorted order.
|
|
// Already-applied migrations (tracked in schema_migrations) are skipped.
|
|
func Run(ctx context.Context, pool *pgxpool.Pool, dir string) error {
|
|
if _, err := pool.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
filename TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)`); err != nil {
|
|
return fmt.Errorf("migrate: create tracking table: %w", err)
|
|
}
|
|
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return fmt.Errorf("migrate: read dir %s: %w", dir, err)
|
|
}
|
|
|
|
var files []string
|
|
for _, e := range entries {
|
|
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
|
|
files = append(files, e.Name())
|
|
}
|
|
}
|
|
sort.Strings(files)
|
|
|
|
rows, err := pool.Query(ctx, "SELECT filename FROM schema_migrations")
|
|
if err != nil {
|
|
return fmt.Errorf("migrate: query applied: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
applied := make(map[string]bool)
|
|
for rows.Next() {
|
|
var name string
|
|
if err := rows.Scan(&name); err != nil {
|
|
return fmt.Errorf("migrate: scan: %w", err)
|
|
}
|
|
applied[name] = true
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return fmt.Errorf("migrate: rows: %w", err)
|
|
}
|
|
|
|
// If this is the first time the migration runner sees an existing DB
|
|
// (tables created by docker-entrypoint-initdb.d), mark bootstrap migration as applied.
|
|
if !applied["000001_init.sql"] {
|
|
var tableExists bool
|
|
_ = pool.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'heroes')").Scan(&tableExists)
|
|
if tableExists {
|
|
_, _ = pool.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ('000001_init.sql') ON CONFLICT DO NOTHING")
|
|
applied["000001_init.sql"] = true
|
|
slog.Info("migrate: marked 000001_init.sql as applied (tables already exist)")
|
|
}
|
|
}
|
|
|
|
for _, f := range files {
|
|
if applied[f] {
|
|
continue
|
|
}
|
|
|
|
sql, err := os.ReadFile(filepath.Join(dir, f))
|
|
if err != nil {
|
|
return fmt.Errorf("migrate: read %s: %w", f, err)
|
|
}
|
|
|
|
tx, err := pool.Begin(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("migrate: begin tx for %s: %w", f, err)
|
|
}
|
|
|
|
if _, err := tx.Exec(ctx, string(sql)); err != nil {
|
|
tx.Rollback(ctx) //nolint:errcheck
|
|
return fmt.Errorf("migrate: exec %s: %w", f, err)
|
|
}
|
|
|
|
if _, err := tx.Exec(ctx, "INSERT INTO schema_migrations (filename) VALUES ($1)", f); err != nil {
|
|
tx.Rollback(ctx) //nolint:errcheck
|
|
return fmt.Errorf("migrate: record %s: %w", f, err)
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("migrate: commit %s: %w", f, err)
|
|
}
|
|
|
|
slog.Info("migrate: applied", "file", f)
|
|
}
|
|
|
|
return nil
|
|
}
|