feat(primitives): kei-migrate Rust universal migration runner
Single binary, three backends (Postgres/SQLite/MySQL) autodetected from DATABASE_URL scheme. Sequential .sql migrations tracked in _kei_migrations with SHA-256 checksums. Commands: kei-migrate up — apply pending kei-migrate down [n] — revert last N (requires .down.sql) kei-migrate status — list applied vs pending kei-migrate create <name> — scaffold up+down pair with UTC ts Constructor Pattern: 10 source files, all <90 LOC, functions <30 LOC. Deps: sqlx 0.8 (any+postgres+sqlite+mysql, rustls), clap 4, chrono, sha2, anyhow, tokio. Tests: 9/9 passing (cargo test, SQLite backend). Clippy clean: cargo clippy --all-targets -- -D warnings. Safety features: - checksum drift detection on applied migrations - IRREVERSIBLE marker blocks down-revert - duplicate version detection at scan time - each migration in its own transaction Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f884891862
commit
df857923d4
14 changed files with 3089 additions and 0 deletions
1
_primitives/_rust/kei-migrate/.gitignore
vendored
Normal file
1
_primitives/_rust/kei-migrate/.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
/target
|
||||
2344
_primitives/_rust/kei-migrate/Cargo.lock
generated
Normal file
2344
_primitives/_rust/kei-migrate/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
28
_primitives/_rust/kei-migrate/Cargo.toml
Normal file
28
_primitives/_rust/kei-migrate/Cargo.toml
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
[package]
|
||||
name = "kei-migrate"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Universal SQL migration runner — Postgres/SQLite/MySQL autodetect from DATABASE_URL"
|
||||
license = "MIT"
|
||||
|
||||
[[bin]]
|
||||
name = "kei-migrate"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
sha2 = "0.10"
|
||||
sqlx = { version = "0.8", default-features = false, features = [
|
||||
"runtime-tokio",
|
||||
"tls-rustls",
|
||||
"any",
|
||||
"postgres",
|
||||
"sqlite",
|
||||
"mysql",
|
||||
] }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
44
_primitives/_rust/kei-migrate/src/cli.rs
Normal file
44
_primitives/_rust/kei-migrate/src/cli.rs
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
//! CLI surface — clap argument parsing for `kei-migrate`.
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
name = "kei-migrate",
|
||||
about = "Universal SQL migration runner (Postgres / SQLite / MySQL)",
|
||||
version
|
||||
)]
|
||||
pub struct Cli {
|
||||
/// Database URL. Overrides $DATABASE_URL.
|
||||
/// Formats:
|
||||
/// postgres://user:pass@host:port/db
|
||||
/// sqlite:///absolute/path.db or sqlite::memory:
|
||||
/// mysql://user:pass@host:port/db
|
||||
#[arg(long, env = "DATABASE_URL")]
|
||||
pub database_url: String,
|
||||
|
||||
/// Migrations directory (default: ./migrations)
|
||||
#[arg(long, default_value = "migrations")]
|
||||
pub dir: String,
|
||||
|
||||
#[command(subcommand)]
|
||||
pub command: Command,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub enum Command {
|
||||
/// Apply all pending migrations.
|
||||
Up,
|
||||
/// Revert the last N migrations (requires <ts>_<name>.down.sql).
|
||||
Down {
|
||||
#[arg(default_value_t = 1)]
|
||||
n: u32,
|
||||
},
|
||||
/// List applied vs pending migrations.
|
||||
Status,
|
||||
/// Create a new timestamped migration scaffold: <ts>_<name>.sql (+ .down.sql).
|
||||
Create {
|
||||
/// Short migration name, e.g. "add_users_email_index".
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
44
_primitives/_rust/kei-migrate/src/cmd_create.rs
Normal file
44
_primitives/_rust/kei-migrate/src/cmd_create.rs
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
//! `kei-migrate create <name>` — scaffold a new timestamped migration pair.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use chrono::Utc;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
const UP_TEMPLATE: &str = "-- up migration\n-- Write forward-direction SQL below.\n\n";
|
||||
const DOWN_TEMPLATE: &str =
|
||||
"-- down migration\n-- Write reverse SQL below, or add `-- IRREVERSIBLE` to block reversion.\n\n";
|
||||
|
||||
/// Create `<dir>/<utc-timestamp>_<sanitized-name>.sql` + `.down.sql`. Returns paths written.
|
||||
pub fn run(dir: &Path, name: &str) -> Result<(PathBuf, PathBuf)> {
|
||||
validate_name(name)?;
|
||||
fs::create_dir_all(dir).with_context(|| format!("mkdir -p {}", dir.display()))?;
|
||||
let ts = Utc::now().format("%Y%m%d%H%M%S").to_string();
|
||||
let sanitized = sanitize(name);
|
||||
let up = dir.join(format!("{}_{}.sql", ts, sanitized));
|
||||
let down = dir.join(format!("{}_{}.down.sql", ts, sanitized));
|
||||
if up.exists() || down.exists() {
|
||||
bail!("collision: {} or {} already exists", up.display(), down.display());
|
||||
}
|
||||
fs::write(&up, UP_TEMPLATE)?;
|
||||
fs::write(&down, DOWN_TEMPLATE)?;
|
||||
println!("[create] {}", up.display());
|
||||
println!("[create] {}", down.display());
|
||||
Ok((up, down))
|
||||
}
|
||||
|
||||
fn validate_name(name: &str) -> Result<()> {
|
||||
if name.is_empty() {
|
||||
bail!("migration name must not be empty");
|
||||
}
|
||||
if name.len() > 80 {
|
||||
bail!("migration name too long ({} chars, max 80)", name.len());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sanitize(name: &str) -> String {
|
||||
name.chars()
|
||||
.map(|c| if c.is_ascii_alphanumeric() { c.to_ascii_lowercase() } else { '_' })
|
||||
.collect()
|
||||
}
|
||||
58
_primitives/_rust/kei-migrate/src/cmd_down.rs
Normal file
58
_primitives/_rust/kei-migrate/src/cmd_down.rs
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
//! `kei-migrate down [n]` — revert the last N applied migrations.
|
||||
//!
|
||||
//! Requires a sibling `<version>_<name>.down.sql` for each target. Missing
|
||||
//! down-file = hard error — we don't guess reversals.
|
||||
|
||||
use crate::discover::Migration;
|
||||
use crate::tracker;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use sqlx::AnyPool;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Revert the last `n` applied migrations in reverse order.
|
||||
pub async fn run(pool: &AnyPool, migrations: &[Migration], n: u32) -> Result<u32> {
|
||||
let mut applied: Vec<i64> = tracker::applied_versions(pool).await?;
|
||||
applied.sort_unstable();
|
||||
applied.reverse(); // newest first
|
||||
let by_version: HashMap<i64, &Migration> =
|
||||
migrations.iter().map(|m| (m.version, m)).collect();
|
||||
let mut reverted = 0u32;
|
||||
for v in applied.into_iter().take(n as usize) {
|
||||
let m = by_version.get(&v).with_context(|| {
|
||||
format!("applied version {} has no matching file on disk", v)
|
||||
})?;
|
||||
revert_one(pool, m).await?;
|
||||
reverted += 1;
|
||||
println!("[down] {} {} — reverted", m.version, m.name);
|
||||
}
|
||||
Ok(reverted)
|
||||
}
|
||||
|
||||
async fn revert_one(pool: &AnyPool, m: &Migration) -> Result<()> {
|
||||
let down_path = m.down_path.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"no down-sql for migration {} ({}) — create {}_{}.down.sql",
|
||||
m.version,
|
||||
m.name,
|
||||
m.version,
|
||||
m.name
|
||||
)
|
||||
})?;
|
||||
let sql = std::fs::read_to_string(down_path)
|
||||
.with_context(|| format!("read {}", down_path.display()))?;
|
||||
if sql.contains("-- IRREVERSIBLE") {
|
||||
bail!(
|
||||
"migration {} ({}) is marked IRREVERSIBLE — refusing to run down-sql",
|
||||
m.version,
|
||||
m.name
|
||||
);
|
||||
}
|
||||
let mut tx = pool.begin().await?;
|
||||
sqlx::raw_sql(&sql)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.with_context(|| format!("revert migration {} ({})", m.version, m.name))?;
|
||||
tx.commit().await?;
|
||||
tracker::record_down(pool, m.version).await?;
|
||||
Ok(())
|
||||
}
|
||||
29
_primitives/_rust/kei-migrate/src/cmd_status.rs
Normal file
29
_primitives/_rust/kei-migrate/src/cmd_status.rs
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
//! `kei-migrate status` — list applied + pending migrations.
|
||||
|
||||
use crate::discover::Migration;
|
||||
use crate::tracker;
|
||||
use anyhow::Result;
|
||||
use sqlx::AnyPool;
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// Print a human-readable table. Returns (applied_count, pending_count).
|
||||
pub async fn run(pool: &AnyPool, migrations: &[Migration]) -> Result<(u32, u32)> {
|
||||
let applied: HashSet<i64> = tracker::applied_versions(pool).await?.into_iter().collect();
|
||||
let mut a = 0u32;
|
||||
let mut p = 0u32;
|
||||
println!("{:>14} {:<8} name", "version", "status");
|
||||
println!("{:>14} {:<8} ----", "-------", "------");
|
||||
for m in migrations {
|
||||
let status = if applied.contains(&m.version) {
|
||||
a += 1;
|
||||
"APPLIED"
|
||||
} else {
|
||||
p += 1;
|
||||
"PENDING"
|
||||
};
|
||||
println!("{:>14} {:<8} {}", m.version, status, m.name);
|
||||
}
|
||||
println!();
|
||||
println!("{} applied, {} pending", a, p);
|
||||
Ok((a, p))
|
||||
}
|
||||
42
_primitives/_rust/kei-migrate/src/cmd_up.rs
Normal file
42
_primitives/_rust/kei-migrate/src/cmd_up.rs
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
//! `kei-migrate up` — apply all pending migrations in version-ASC order.
|
||||
|
||||
use crate::discover::Migration;
|
||||
use crate::tracker;
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use sqlx::AnyPool;
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// Apply every migration whose version is not in the applied set.
|
||||
/// Each migration runs in its own transaction; failure aborts and leaves
|
||||
/// prior applied migrations committed.
|
||||
pub async fn run(pool: &AnyPool, migrations: &[Migration]) -> Result<u32> {
|
||||
let applied: HashSet<i64> = tracker::applied_versions(pool).await?.into_iter().collect();
|
||||
let on_disk: Vec<(i64, &str, &str)> = migrations
|
||||
.iter()
|
||||
.map(|m| (m.version, m.name.as_str(), m.checksum.as_str()))
|
||||
.collect();
|
||||
tracker::verify_checksums(pool, on_disk).await?;
|
||||
let mut count = 0u32;
|
||||
for m in migrations {
|
||||
if applied.contains(&m.version) {
|
||||
continue;
|
||||
}
|
||||
apply_one(pool, m).await?;
|
||||
count += 1;
|
||||
println!("[up] {} {} — applied", m.version, m.name);
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn apply_one(pool: &AnyPool, m: &Migration) -> Result<()> {
|
||||
let mut tx = pool.begin().await?;
|
||||
sqlx::raw_sql(&m.up_sql)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.with_context(|| format!("apply migration {} ({})", m.version, m.name))?;
|
||||
tx.commit().await?;
|
||||
let now = Utc::now().to_rfc3339();
|
||||
tracker::record_up(pool, m.version, &m.name, &m.checksum, &now).await?;
|
||||
Ok(())
|
||||
}
|
||||
68
_primitives/_rust/kei-migrate/src/db.rs
Normal file
68
_primitives/_rust/kei-migrate/src/db.rs
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
//! Database backend detection + pool construction.
|
||||
//!
|
||||
//! Uses `sqlx::Any` so one binary covers Postgres / SQLite / MySQL.
|
||||
//! Detection is purely on URL scheme — no live probe needed.
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use sqlx::any::{install_default_drivers, AnyPoolOptions};
|
||||
use sqlx::AnyPool;
|
||||
|
||||
/// Backend inferred from the URL scheme. Determines dialect quirks.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum Backend {
|
||||
Postgres,
|
||||
Sqlite,
|
||||
Mysql,
|
||||
}
|
||||
|
||||
impl Backend {
|
||||
/// Backend-specific CREATE TABLE for `_kei_migrations`.
|
||||
pub fn create_tracker_sql(self) -> &'static str {
|
||||
match self {
|
||||
Backend::Postgres | Backend::Mysql => {
|
||||
"CREATE TABLE IF NOT EXISTS _kei_migrations (
|
||||
version BIGINT PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
checksum CHAR(64) NOT NULL,
|
||||
applied_at VARCHAR(32) NOT NULL
|
||||
)"
|
||||
}
|
||||
Backend::Sqlite => {
|
||||
"CREATE TABLE IF NOT EXISTS _kei_migrations (
|
||||
version INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
checksum TEXT NOT NULL,
|
||||
applied_at TEXT NOT NULL
|
||||
)"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a database URL into a [`Backend`]. Never touches the network.
|
||||
pub fn detect_backend(url: &str) -> Result<Backend> {
|
||||
let lower = url.to_ascii_lowercase();
|
||||
if lower.starts_with("postgres://") || lower.starts_with("postgresql://") {
|
||||
Ok(Backend::Postgres)
|
||||
} else if lower.starts_with("sqlite:") {
|
||||
Ok(Backend::Sqlite)
|
||||
} else if lower.starts_with("mysql://") || lower.starts_with("mariadb://") {
|
||||
Ok(Backend::Mysql)
|
||||
} else {
|
||||
bail!(
|
||||
"unsupported or unrecognised DATABASE_URL scheme: {}. \
|
||||
Expected postgres://, sqlite:, or mysql://",
|
||||
url
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a sqlx `AnyPool` for the given URL (max 4 conns — migration runner is not a server).
|
||||
pub async fn connect(url: &str) -> Result<AnyPool> {
|
||||
install_default_drivers();
|
||||
let pool = AnyPoolOptions::new()
|
||||
.max_connections(4)
|
||||
.connect(url)
|
||||
.await?;
|
||||
Ok(pool)
|
||||
}
|
||||
87
_primitives/_rust/kei-migrate/src/discover.rs
Normal file
87
_primitives/_rust/kei-migrate/src/discover.rs
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
//! Filesystem migration discovery.
|
||||
//!
|
||||
//! Convention: `migrations/<version>_<name>.sql` (up) and optional
|
||||
//! `migrations/<version>_<name>.down.sql` (down). Version is a monotonic
|
||||
//! integer, typically a UTC timestamp like `20260421120000`.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// One discovered migration (up-side). `down_path` is `Some` iff the sibling file exists.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Migration {
|
||||
pub version: i64,
|
||||
pub name: String,
|
||||
pub up_path: PathBuf,
|
||||
pub down_path: Option<PathBuf>,
|
||||
pub up_sql: String,
|
||||
pub checksum: String,
|
||||
}
|
||||
|
||||
/// Read every `<version>_<name>.sql` file (ignoring `.down.sql`), sort by version ASC.
|
||||
pub fn scan(dir: &Path) -> Result<Vec<Migration>> {
|
||||
if !dir.exists() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let mut out = Vec::new();
|
||||
for entry in fs::read_dir(dir).with_context(|| format!("read_dir {}", dir.display()))? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|s| s.to_str()) != Some("sql") {
|
||||
continue;
|
||||
}
|
||||
let fname = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
|
||||
if fname.ends_with(".down.sql") {
|
||||
continue;
|
||||
}
|
||||
let m = parse_migration(&path)?;
|
||||
out.push(m);
|
||||
}
|
||||
out.sort_by_key(|m| m.version);
|
||||
check_unique(&out)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn parse_migration(path: &Path) -> Result<Migration> {
|
||||
let stem = path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.context("non-utf8 filename")?;
|
||||
let (ver_str, name) = stem
|
||||
.split_once('_')
|
||||
.with_context(|| format!("filename not <version>_<name>.sql: {}", stem))?;
|
||||
let version: i64 = ver_str
|
||||
.parse()
|
||||
.with_context(|| format!("version must be integer, got {}", ver_str))?;
|
||||
let up_sql = fs::read_to_string(path)
|
||||
.with_context(|| format!("read {}", path.display()))?;
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(up_sql.as_bytes());
|
||||
let checksum = format!("{:x}", hasher.finalize());
|
||||
let down_path = path.with_file_name(format!("{}_{}.down.sql", version, name));
|
||||
let down = if down_path.exists() { Some(down_path) } else { None };
|
||||
Ok(Migration {
|
||||
version,
|
||||
name: name.to_string(),
|
||||
up_path: path.to_path_buf(),
|
||||
down_path: down,
|
||||
up_sql,
|
||||
checksum,
|
||||
})
|
||||
}
|
||||
|
||||
fn check_unique(migs: &[Migration]) -> Result<()> {
|
||||
for w in migs.windows(2) {
|
||||
if w[0].version == w[1].version {
|
||||
bail!(
|
||||
"duplicate migration version {} ({} and {})",
|
||||
w[0].version,
|
||||
w[0].up_path.display(),
|
||||
w[1].up_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
54
_primitives/_rust/kei-migrate/src/lib.rs
Normal file
54
_primitives/_rust/kei-migrate/src/lib.rs
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
//! kei-migrate — universal SQL migration runner.
|
||||
//!
|
||||
//! Single binary, three backends (Postgres / SQLite / MySQL) autodetected
|
||||
//! from `DATABASE_URL`. Sequential `.sql` files in `migrations/`, tracked in
|
||||
//! `_kei_migrations` with SHA-256 checksums.
|
||||
//!
|
||||
//! Library surface exists so integration tests can drive the primitive
|
||||
//! without `process::Command` gymnastics.
|
||||
|
||||
pub mod cli;
|
||||
pub mod cmd_create;
|
||||
pub mod cmd_down;
|
||||
pub mod cmd_status;
|
||||
pub mod cmd_up;
|
||||
pub mod db;
|
||||
pub mod discover;
|
||||
pub mod tracker;
|
||||
|
||||
use anyhow::Result;
|
||||
use std::path::Path;
|
||||
|
||||
/// End-to-end `up` entry: connect, ensure tracker, scan dir, apply pending.
|
||||
/// Returns number of migrations applied.
|
||||
pub async fn do_up(database_url: &str, dir: &Path) -> Result<u32> {
|
||||
let backend = db::detect_backend(database_url)?;
|
||||
let pool = db::connect(database_url).await?;
|
||||
tracker::ensure_table(&pool, backend).await?;
|
||||
let migs = discover::scan(dir)?;
|
||||
let n = cmd_up::run(&pool, &migs).await?;
|
||||
pool.close().await;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
/// End-to-end `down` entry: revert last N applied.
|
||||
pub async fn do_down(database_url: &str, dir: &Path, n: u32) -> Result<u32> {
|
||||
let backend = db::detect_backend(database_url)?;
|
||||
let pool = db::connect(database_url).await?;
|
||||
tracker::ensure_table(&pool, backend).await?;
|
||||
let migs = discover::scan(dir)?;
|
||||
let reverted = cmd_down::run(&pool, &migs, n).await?;
|
||||
pool.close().await;
|
||||
Ok(reverted)
|
||||
}
|
||||
|
||||
/// End-to-end `status` entry: returns (applied, pending) counts.
|
||||
pub async fn do_status(database_url: &str, dir: &Path) -> Result<(u32, u32)> {
|
||||
let backend = db::detect_backend(database_url)?;
|
||||
let pool = db::connect(database_url).await?;
|
||||
tracker::ensure_table(&pool, backend).await?;
|
||||
let migs = discover::scan(dir)?;
|
||||
let r = cmd_status::run(&pool, &migs).await?;
|
||||
pool.close().await;
|
||||
Ok(r)
|
||||
}
|
||||
30
_primitives/_rust/kei-migrate/src/main.rs
Normal file
30
_primitives/_rust/kei-migrate/src/main.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
//! kei-migrate CLI entry. Dispatches to the library surface in `lib.rs`.
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use kei_migrate::cli::{Cli, Command};
|
||||
use kei_migrate::{cmd_create, do_down, do_status, do_up};
|
||||
use std::path::Path;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
let dir = Path::new(&cli.dir);
|
||||
match cli.command {
|
||||
Command::Up => {
|
||||
let n = do_up(&cli.database_url, dir).await?;
|
||||
println!("up: {} migration(s) applied", n);
|
||||
}
|
||||
Command::Down { n } => {
|
||||
let r = do_down(&cli.database_url, dir, n).await?;
|
||||
println!("down: {} migration(s) reverted", r);
|
||||
}
|
||||
Command::Status => {
|
||||
do_status(&cli.database_url, dir).await?;
|
||||
}
|
||||
Command::Create { name } => {
|
||||
cmd_create::run(dir, &name)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
86
_primitives/_rust/kei-migrate/src/tracker.rs
Normal file
86
_primitives/_rust/kei-migrate/src/tracker.rs
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
//! `_kei_migrations` tracking table operations.
|
||||
//!
|
||||
//! Row shape: (version, name, checksum, applied_at). Checksum guards against
|
||||
//! silent edits of an applied file — mismatch = hard fail, requires human ack.
|
||||
|
||||
use crate::db::Backend;
|
||||
use anyhow::{bail, Result};
|
||||
use sqlx::{AnyPool, Row};
|
||||
|
||||
/// Create tracker table if missing. Idempotent.
|
||||
pub async fn ensure_table(pool: &AnyPool, backend: Backend) -> Result<()> {
|
||||
sqlx::query(backend.create_tracker_sql()).execute(pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Versions of all applied migrations, ASC.
|
||||
pub async fn applied_versions(pool: &AnyPool) -> Result<Vec<i64>> {
|
||||
let rows = sqlx::query("SELECT version FROM _kei_migrations ORDER BY version ASC")
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
let mut out = Vec::with_capacity(rows.len());
|
||||
for r in rows {
|
||||
out.push(r.try_get::<i64, _>(0)?);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Checksum of a specific applied version, or `None` if not applied.
|
||||
pub async fn applied_checksum(pool: &AnyPool, version: i64) -> Result<Option<String>> {
|
||||
let row = sqlx::query("SELECT checksum FROM _kei_migrations WHERE version = $1")
|
||||
.bind(version)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
Ok(row.map(|r| r.try_get::<String, _>(0)).transpose()?)
|
||||
}
|
||||
|
||||
/// Insert a tracker row after a successful up-migration.
|
||||
pub async fn record_up(
|
||||
pool: &AnyPool,
|
||||
version: i64,
|
||||
name: &str,
|
||||
checksum: &str,
|
||||
applied_at: &str,
|
||||
) -> Result<()> {
|
||||
sqlx::query(
|
||||
"INSERT INTO _kei_migrations (version, name, checksum, applied_at) VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind(version)
|
||||
.bind(name)
|
||||
.bind(checksum)
|
||||
.bind(applied_at)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a tracker row after a successful down-migration.
|
||||
pub async fn record_down(pool: &AnyPool, version: i64) -> Result<()> {
|
||||
sqlx::query("DELETE FROM _kei_migrations WHERE version = $1")
|
||||
.bind(version)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Abort if any applied migration's recorded checksum doesn't match the on-disk file.
|
||||
pub async fn verify_checksums<'a, I>(pool: &AnyPool, on_disk: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = (i64, &'a str, &'a str)>, // (version, name, checksum)
|
||||
{
|
||||
for (version, name, disk_sum) in on_disk {
|
||||
if let Some(db_sum) = applied_checksum(pool, version).await? {
|
||||
if db_sum != disk_sum {
|
||||
bail!(
|
||||
"checksum drift on applied migration {} ({}): db={}, disk={}. \
|
||||
Refusing to proceed — someone edited an already-applied file.",
|
||||
version,
|
||||
name,
|
||||
db_sum,
|
||||
disk_sum
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
174
_primitives/_rust/kei-migrate/tests/integration.rs
Normal file
174
_primitives/_rust/kei-migrate/tests/integration.rs
Normal file
|
|
@ -0,0 +1,174 @@
|
|||
//! Integration tests for kei-migrate against a SQLite file (safe, no deps).
|
||||
//!
|
||||
//! SQLite is chosen as the test backend because it has no server dependency
|
||||
//! and the sqlx-Any path through it exercises the same code path as Postgres
|
||||
//! / MySQL for everything except dialect-specific DDL (which we abstract in
|
||||
//! `db::Backend::create_tracker_sql`).
|
||||
|
||||
use kei_migrate::{cmd_create, db, discover, do_down, do_status, do_up, tracker};
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
struct Env {
|
||||
_tmp: TempDir,
|
||||
url: String,
|
||||
dir: PathBuf,
|
||||
}
|
||||
|
||||
fn setup() -> Env {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let db_path = tmp.path().join("test.db");
|
||||
let url = format!("sqlite://{}?mode=rwc", db_path.display());
|
||||
let dir = tmp.path().join("migrations");
|
||||
fs::create_dir_all(&dir).unwrap();
|
||||
Env { _tmp: tmp, url, dir }
|
||||
}
|
||||
|
||||
fn write_migration(dir: &std::path::Path, version: i64, name: &str, up: &str, down: Option<&str>) {
|
||||
fs::write(dir.join(format!("{}_{}.sql", version, name)), up).unwrap();
|
||||
if let Some(d) = down {
|
||||
fs::write(dir.join(format!("{}_{}.down.sql", version, name)), d).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detects_backend_from_url_scheme() {
|
||||
assert_eq!(
|
||||
db::detect_backend("postgres://u:p@h/d").unwrap(),
|
||||
db::Backend::Postgres
|
||||
);
|
||||
assert_eq!(
|
||||
db::detect_backend("sqlite:///tmp/x.db").unwrap(),
|
||||
db::Backend::Sqlite
|
||||
);
|
||||
assert_eq!(
|
||||
db::detect_backend("mysql://u:p@h/d").unwrap(),
|
||||
db::Backend::Mysql
|
||||
);
|
||||
assert!(db::detect_backend("mongodb://h").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn scan_empty_dir_is_empty() {
|
||||
let env = setup();
|
||||
let migs = discover::scan(&env.dir).unwrap();
|
||||
assert!(migs.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn scan_sorts_by_version_and_skips_down_files() {
|
||||
let env = setup();
|
||||
write_migration(&env.dir, 2, "second", "SELECT 1;", Some("SELECT 2;"));
|
||||
write_migration(&env.dir, 1, "first", "SELECT 3;", None);
|
||||
let migs = discover::scan(&env.dir).unwrap();
|
||||
assert_eq!(migs.len(), 2);
|
||||
assert_eq!(migs[0].version, 1);
|
||||
assert_eq!(migs[1].version, 2);
|
||||
assert!(migs[0].down_path.is_none());
|
||||
assert!(migs[1].down_path.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn scan_rejects_duplicate_versions() {
|
||||
let env = setup();
|
||||
write_migration(&env.dir, 1, "a", "SELECT 1;", None);
|
||||
// same version, different name
|
||||
fs::write(env.dir.join("1_b.sql"), "SELECT 2;").unwrap();
|
||||
let err = discover::scan(&env.dir).unwrap_err();
|
||||
assert!(err.to_string().contains("duplicate migration version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn up_applies_pending_and_tracks_them() {
|
||||
let env = setup();
|
||||
write_migration(
|
||||
&env.dir,
|
||||
1,
|
||||
"create_t",
|
||||
"CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);",
|
||||
Some("DROP TABLE t;"),
|
||||
);
|
||||
write_migration(
|
||||
&env.dir,
|
||||
2,
|
||||
"insert_row",
|
||||
"INSERT INTO t (id, v) VALUES (1, 'a');",
|
||||
Some("DELETE FROM t WHERE id = 1;"),
|
||||
);
|
||||
let n = do_up(&env.url, &env.dir).await.unwrap();
|
||||
assert_eq!(n, 2);
|
||||
// re-running is a no-op
|
||||
let n2 = do_up(&env.url, &env.dir).await.unwrap();
|
||||
assert_eq!(n2, 0);
|
||||
// status: 2 applied, 0 pending
|
||||
let (a, p) = do_status(&env.url, &env.dir).await.unwrap();
|
||||
assert_eq!(a, 2);
|
||||
assert_eq!(p, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn down_reverts_last_n() {
|
||||
let env = setup();
|
||||
write_migration(
|
||||
&env.dir,
|
||||
1,
|
||||
"create_t",
|
||||
"CREATE TABLE t (id INTEGER PRIMARY KEY);",
|
||||
Some("DROP TABLE t;"),
|
||||
);
|
||||
write_migration(
|
||||
&env.dir,
|
||||
2,
|
||||
"add_col",
|
||||
"ALTER TABLE t ADD COLUMN v TEXT;",
|
||||
Some("-- down unsupported on sqlite but we just drop the table for this test\nDROP TABLE t; CREATE TABLE t (id INTEGER PRIMARY KEY);"),
|
||||
);
|
||||
do_up(&env.url, &env.dir).await.unwrap();
|
||||
let reverted = do_down(&env.url, &env.dir, 1).await.unwrap();
|
||||
assert_eq!(reverted, 1);
|
||||
let (a, p) = do_status(&env.url, &env.dir).await.unwrap();
|
||||
assert_eq!(a, 1);
|
||||
assert_eq!(p, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn down_refuses_irreversible_marker() {
|
||||
let env = setup();
|
||||
write_migration(
|
||||
&env.dir,
|
||||
1,
|
||||
"make_t",
|
||||
"CREATE TABLE t (id INTEGER PRIMARY KEY);",
|
||||
Some("-- IRREVERSIBLE\n-- don't auto-revert"),
|
||||
);
|
||||
do_up(&env.url, &env.dir).await.unwrap();
|
||||
let err = do_down(&env.url, &env.dir, 1).await.unwrap_err();
|
||||
assert!(err.to_string().contains("IRREVERSIBLE"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn up_detects_checksum_drift() {
|
||||
let env = setup();
|
||||
let up_path = env.dir.join("1_seed.sql");
|
||||
fs::write(&up_path, "CREATE TABLE t (id INTEGER PRIMARY KEY);").unwrap();
|
||||
do_up(&env.url, &env.dir).await.unwrap();
|
||||
// someone edits the already-applied file
|
||||
fs::write(&up_path, "CREATE TABLE t (id INTEGER PRIMARY KEY, extra TEXT);").unwrap();
|
||||
let err = do_up(&env.url, &env.dir).await.unwrap_err();
|
||||
assert!(err.to_string().contains("checksum drift"));
|
||||
// tracker module is exercised end-to-end here
|
||||
let _ = tracker::applied_versions; // keep tracker import used
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_scaffolds_up_and_down_files() {
|
||||
let env = setup();
|
||||
let (up, down) = cmd_create::run(&env.dir, "add_users_index").unwrap();
|
||||
assert!(up.exists());
|
||||
assert!(down.exists());
|
||||
let up_txt = fs::read_to_string(&up).unwrap();
|
||||
let down_txt = fs::read_to_string(&down).unwrap();
|
||||
assert!(up_txt.contains("up migration"));
|
||||
assert!(down_txt.contains("down migration"));
|
||||
}
|
||||
Loading…
Reference in a new issue