Merge feat/wave13-dna-unique — DNA UNIQUE constraint
This commit is contained in:
commit
770402ac7a
4 changed files with 267 additions and 8 deletions
|
|
@ -11,7 +11,8 @@ use std::fmt;
|
|||
pub const MAX_TREE_DEPTH: usize = 1024;
|
||||
|
||||
/// Errors from ledger ops that extend beyond raw SQL (tree walk + input
|
||||
/// validation). Hot-path SQL calls still return `rusqlite::Error` directly.
|
||||
/// validation + DNA uniqueness). Hot-path SQL calls still return
|
||||
/// `rusqlite::Error` directly when no typed surface is required.
|
||||
#[derive(Debug)]
|
||||
pub enum LedgerError {
|
||||
Sql(rusqlite::Error),
|
||||
|
|
@ -19,6 +20,20 @@ pub enum LedgerError {
|
|||
MaxDepthExceeded,
|
||||
/// Branch name longer than `MAX_BRANCH_LEN` chars (audit L1 cap).
|
||||
BranchTooLong { field: &'static str, len: usize },
|
||||
/// Attempted `fork` with a DNA that is already present in the ledger.
|
||||
/// Caller decides whether to regenerate DNA with a fresh nonce — the
|
||||
/// ledger never silently retries (v5, 2026-04-23).
|
||||
DnaCollision { dna: String },
|
||||
/// v5 migration detected pre-existing duplicate DNAs in the agents
|
||||
/// table. The UNIQUE index cannot be applied without data loss; the
|
||||
/// operator must manually reconcile rows. `duplicates` lists each
|
||||
/// offending DNA and its occurrence count.
|
||||
///
|
||||
/// Resolution (back up the file first): run
|
||||
/// `kei-ledger sql 'DELETE FROM agents WHERE rowid NOT IN
|
||||
/// (SELECT MIN(rowid) FROM agents GROUP BY dna)'`
|
||||
/// then re-open to retry migration.
|
||||
DnaMigrationBlocked { duplicates: Vec<(String, usize)> },
|
||||
}
|
||||
|
||||
impl fmt::Display for LedgerError {
|
||||
|
|
@ -33,6 +48,21 @@ impl fmt::Display for LedgerError {
|
|||
f,
|
||||
"{field} length {len} exceeds cap {MAX_BRANCH_LEN}"
|
||||
),
|
||||
LedgerError::DnaCollision { dna } => write!(
|
||||
f,
|
||||
"dna collision: {dna} already present — regenerate nonce and retry"
|
||||
),
|
||||
LedgerError::DnaMigrationBlocked { duplicates } => {
|
||||
write!(
|
||||
f,
|
||||
"v5 migration blocked: {} duplicate dna(s) in table; reconcile before reopen:",
|
||||
duplicates.len()
|
||||
)?;
|
||||
for (dna, count) in duplicates {
|
||||
write!(f, "\n {dna} ({count}x)")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,11 @@ use std::collections::HashSet;
|
|||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Open or create the ledger file and run migrations.
|
||||
pub fn open(path: &Path) -> SqlResult<Connection> {
|
||||
///
|
||||
/// Returns `LedgerError` rather than raw `rusqlite::Error` because v5 can
|
||||
/// surface `DnaMigrationBlocked` (pre-existing duplicate DNAs). Legacy
|
||||
/// rusqlite errors are wrapped in `LedgerError::Sql` via `From`.
|
||||
pub fn open(path: &Path) -> Result<Connection, LedgerError> {
|
||||
if let Some(parent) = path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
|
|
@ -35,6 +39,10 @@ fn check_branch_lens(branch: &str, parent: Option<&str>) -> Result<(), LedgerErr
|
|||
}
|
||||
|
||||
/// Insert running-agent row. Errors on duplicate id or branch > MAX_BRANCH_LEN.
|
||||
///
|
||||
/// Surfaces `LedgerError::DnaCollision { dna }` when a non-NULL DNA already
|
||||
/// exists in the ledger (v5 UNIQUE index). Caller regenerates DNA with a
|
||||
/// fresh nonce and retries — the ledger never silently dedupes.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn fork(
|
||||
conn: &Connection,
|
||||
|
|
@ -49,14 +57,31 @@ pub fn fork(
|
|||
) -> Result<(), LedgerError> {
|
||||
check_branch_lens(branch, parent)?;
|
||||
let now = Utc::now().timestamp();
|
||||
conn.execute(
|
||||
let res = conn.execute(
|
||||
"INSERT INTO agents
|
||||
(id, branch, parent_branch, spec_sha, status, started_ts,
|
||||
worktree_path, dna, creator_id, fork_parent_id)
|
||||
VALUES (?1, ?2, ?3, ?4, 'running', ?5, ?6, ?7, ?8, ?9)",
|
||||
params![id, branch, parent, spec_sha, now, worktree, dna, creator_id, fork_parent_id],
|
||||
)?;
|
||||
Ok(())
|
||||
);
|
||||
match res {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(classify_insert_error(e, dna)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Map rusqlite unique-constraint failures to typed variants. `agents.dna`
|
||||
/// violations become `DnaCollision`; everything else (id collision, trigger
|
||||
/// abort, etc.) flows through as `LedgerError::Sql`.
|
||||
fn classify_insert_error(e: rusqlite::Error, dna: Option<&str>) -> LedgerError {
|
||||
let msg = e.to_string();
|
||||
if msg.contains("agents.dna") || msg.contains("idx_agents_dna_unique") {
|
||||
LedgerError::DnaCollision {
|
||||
dna: dna.unwrap_or_default().to_string(),
|
||||
}
|
||||
} else {
|
||||
LedgerError::Sql(e)
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark a running agent as done. No-op if already in terminal state.
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
//! Single source of truth for table shape. Any structural change MUST
|
||||
//! bump the migration list below; existing rows are preserved.
|
||||
|
||||
use crate::error::LedgerError;
|
||||
use rusqlite::{Connection, Result};
|
||||
|
||||
/// Maximum length (chars) accepted for `branch` and `parent_branch` columns.
|
||||
|
|
@ -57,27 +58,77 @@ pub const MIGRATIONS: &[&str] = &[
|
|||
ALTER TABLE agents ADD COLUMN fork_parent_id TEXT;
|
||||
CREATE INDEX IF NOT EXISTS idx_agents_creator ON agents(creator_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_agents_fork_parent ON agents(fork_parent_id);",
|
||||
// v5 — UNIQUE constraint on dna (2026-04-23). Prevents silent duplicate
|
||||
// rows when the 32-bit DNA nonce collides (~65k-agent birthday bound per
|
||||
// role+caps+scope+body). Multiple NULLs are still accepted — SQLite's
|
||||
// default index semantics treat NULLs as distinct, matching the v2
|
||||
// "dna optional" contract. Pre-check in `migrate()` aborts with
|
||||
// `LedgerError::DnaMigrationBlocked` if existing rows already conflict,
|
||||
// so the operator reconciles manually instead of losing data.
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_dna_unique ON agents(dna);",
|
||||
];
|
||||
|
||||
/// Schema version the v5 pre-check guards. Kept as a named constant so the
|
||||
/// branch in `migrate()` stays self-documenting when future migrations land.
|
||||
const V5_TARGET: i64 = 5;
|
||||
|
||||
/// Apply all pending migrations atomically (one transaction per version).
|
||||
///
|
||||
/// Prior design ran `execute_batch` and bumped `user_version` in a separate
|
||||
/// call — a partial failure left the schema half-applied and wedged restarts.
|
||||
/// Now each version's DDL + the `user_version` bump share a transaction, so
|
||||
/// any error rolls everything back and the next startup retries cleanly.
|
||||
pub fn migrate(conn: &Connection) -> Result<()> {
|
||||
///
|
||||
/// The return type is `LedgerError` (not `rusqlite::Error`) because v5
|
||||
/// surfaces a typed `DnaMigrationBlocked` when pre-existing duplicates would
|
||||
/// make the UNIQUE index creation fail — callers deserve a structured error,
|
||||
/// not an opaque "UNIQUE constraint failed" from raw SQL.
|
||||
pub fn migrate(conn: &Connection) -> std::result::Result<(), LedgerError> {
|
||||
let current: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap_or(0);
|
||||
for (i, sql) in MIGRATIONS.iter().enumerate() {
|
||||
let target = (i + 1) as i64;
|
||||
if current < target {
|
||||
apply_one(conn, sql, target)?;
|
||||
if target == V5_TARGET {
|
||||
precheck_dna_uniqueness(conn)?;
|
||||
}
|
||||
apply_one(conn, sql, target).map_err(LedgerError::Sql)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v5 pre-check — scan existing rows for duplicate non-NULL DNAs. If any
|
||||
/// exist, abort with `DnaMigrationBlocked` listing each offender and its
|
||||
/// count. NULL DNAs are ignored because SQLite's default UNIQUE semantics
|
||||
/// treat multiple NULLs as distinct (legacy pre-v2 rows stay valid).
|
||||
fn precheck_dna_uniqueness(conn: &Connection) -> std::result::Result<(), LedgerError> {
|
||||
let mut stmt = conn
|
||||
.prepare(
|
||||
"SELECT dna, COUNT(*) AS c FROM agents
|
||||
WHERE dna IS NOT NULL
|
||||
GROUP BY dna HAVING c > 1
|
||||
ORDER BY c DESC, dna ASC",
|
||||
)
|
||||
.map_err(LedgerError::Sql)?;
|
||||
let rows = stmt
|
||||
.query_map([], |r| {
|
||||
let dna: String = r.get(0)?;
|
||||
let count: i64 = r.get(1)?;
|
||||
Ok((dna, count as usize))
|
||||
})
|
||||
.map_err(LedgerError::Sql)?;
|
||||
let duplicates: Vec<(String, usize)> = rows
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map_err(LedgerError::Sql)?;
|
||||
if duplicates.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(LedgerError::DnaMigrationBlocked { duplicates })
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a single migration atomically: DDL + user_version bump in one txn.
|
||||
fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> {
|
||||
conn.execute_batch("BEGIN IMMEDIATE")?;
|
||||
|
|
|
|||
|
|
@ -334,7 +334,7 @@ fn migration_v4_idempotent_across_multiple_reopens() {
|
|||
let v: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at v4");
|
||||
assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at latest");
|
||||
}
|
||||
// Seed a row using v4 columns and verify it round-trips after another reopen.
|
||||
{
|
||||
|
|
@ -349,3 +349,156 @@ fn migration_v4_idempotent_across_multiple_reopens() {
|
|||
assert_eq!(r.creator_id.as_deref(), Some("c"));
|
||||
assert_eq!(r.fork_parent_id.as_deref(), Some("fp"));
|
||||
}
|
||||
|
||||
// --- v5 DNA UNIQUE constraint (2026-04-23) ------------------------------
|
||||
|
||||
/// v5-T1: migration v5 creates `idx_agents_dna_unique` on the agents table.
|
||||
/// Detection via `sqlite_master` is the sqlite-canonical way to assert an
|
||||
/// index exists independent of whether it has been used.
|
||||
#[test]
|
||||
fn v5_migration_adds_unique_index() {
|
||||
let (_d, conn) = open_tmp();
|
||||
let found: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT name FROM sqlite_master
|
||||
WHERE type='index' AND name='idx_agents_dna_unique'",
|
||||
[],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.ok();
|
||||
assert_eq!(found.as_deref(), Some("idx_agents_dna_unique"));
|
||||
// Schema version must have advanced to the migration list's length.
|
||||
let v: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(v, schema::MIGRATIONS.len() as i64);
|
||||
}
|
||||
|
||||
/// v5-T2: a second `fork` with the same DNA is rejected with the typed
|
||||
/// `DnaCollision` variant, not a raw SQL error. The payload preserves the
|
||||
/// offending DNA so the caller can log / regenerate.
|
||||
#[test]
|
||||
fn duplicate_dna_rejected_at_fork() {
|
||||
let (_d, conn) = open_tmp();
|
||||
let dna = "edit-local::NG-FW::ABCD::1234-xy01";
|
||||
ledger::fork(&conn, "a1", "agent/a1", None, "sh", None, Some(dna), None, None).unwrap();
|
||||
let res = ledger::fork(&conn, "a2", "agent/a2", None, "sh", None, Some(dna), None, None);
|
||||
match res {
|
||||
Err(ledger::LedgerError::DnaCollision { dna: got }) => {
|
||||
assert_eq!(got, dna, "collision variant must carry the offending dna");
|
||||
}
|
||||
other => panic!("expected DnaCollision, got {other:?}"),
|
||||
}
|
||||
// Non-dna uniqueness (primary-key id) must still flow through Sql — not
|
||||
// mis-classified as a DNA collision.
|
||||
let res2 = ledger::fork(
|
||||
&conn, "a1", "agent/a1b", None, "sh", None, Some("other-dna"), None, None,
|
||||
);
|
||||
assert!(
|
||||
matches!(res2, Err(ledger::LedgerError::Sql(_))),
|
||||
"duplicate id should stay a Sql error, got {res2:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// v5-T3: opening a freshly-built pre-v5 ledger that already contains
|
||||
/// duplicate DNAs must surface `DnaMigrationBlocked` listing each offender.
|
||||
/// Simulated by: (a) open with current schema to create table, (b) drop the
|
||||
/// v5 UNIQUE index + rewind user_version to 4, (c) INSERT two conflicting
|
||||
/// rows by hand, (d) reopen → pre-check fires before index recreation.
|
||||
#[test]
|
||||
fn v5_migration_detects_preexisting_duplicates() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db = dir.path().join("ledger.sqlite");
|
||||
{
|
||||
let conn = ledger::open(&db).unwrap();
|
||||
conn.execute("DROP INDEX IF EXISTS idx_agents_dna_unique", []).unwrap();
|
||||
conn.pragma_update(None, "user_version", 4_i64).unwrap();
|
||||
// Insert two rows with matching DNA directly — bypasses the v5 UNIQUE
|
||||
// index since we just dropped it, reproducing the pre-v5 world where
|
||||
// the 32-bit DNA nonce already collided for some operator.
|
||||
let ts = 1_700_000_000_i64;
|
||||
let dup = "duplicate-dna-abc";
|
||||
conn.execute(
|
||||
"INSERT INTO agents
|
||||
(id, branch, spec_sha, status, started_ts, dna)
|
||||
VALUES (?1, ?2, ?3, 'running', ?4, ?5)",
|
||||
rusqlite::params!["dup1", "br-dup1", "sha", ts, dup],
|
||||
)
|
||||
.unwrap();
|
||||
conn.execute(
|
||||
"INSERT INTO agents
|
||||
(id, branch, spec_sha, status, started_ts, dna)
|
||||
VALUES (?1, ?2, ?3, 'running', ?4, ?5)",
|
||||
rusqlite::params!["dup2", "br-dup2", "sha", ts, dup],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
// Second open re-enters migrate(); pre-check must fire and block v5.
|
||||
let res = ledger::open(&db);
|
||||
match res {
|
||||
Err(ledger::LedgerError::DnaMigrationBlocked { duplicates }) => {
|
||||
assert_eq!(duplicates.len(), 1, "expected 1 dup group, got {duplicates:?}");
|
||||
assert_eq!(duplicates[0].0, "duplicate-dna-abc");
|
||||
assert_eq!(duplicates[0].1, 2);
|
||||
}
|
||||
other => panic!("expected DnaMigrationBlocked, got {other:?}"),
|
||||
}
|
||||
// Index must NOT have been created (migration aborted cleanly).
|
||||
let conn_raw = rusqlite::Connection::open(&db).unwrap();
|
||||
let exists: Option<String> = conn_raw
|
||||
.query_row(
|
||||
"SELECT name FROM sqlite_master
|
||||
WHERE type='index' AND name='idx_agents_dna_unique'",
|
||||
[],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.ok();
|
||||
assert!(exists.is_none(), "v5 index must not have been applied");
|
||||
let v: i64 = conn_raw
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(v, 4, "user_version must stay at v4 after blocked migration");
|
||||
}
|
||||
|
||||
/// v5-T4: running migrations twice does not fail and leaves the index in
|
||||
/// place. Complements the pre-existing `migrate_is_idempotent_across_reopens`
|
||||
/// test with an assertion that specifically pins v5.
|
||||
#[test]
|
||||
fn migration_idempotent() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db = dir.path().join("ledger.sqlite");
|
||||
// First open applies v1..v5.
|
||||
{
|
||||
let _conn = ledger::open(&db).unwrap();
|
||||
}
|
||||
// Second open must be a no-op — no "index already exists" error.
|
||||
let conn = ledger::open(&db).unwrap();
|
||||
let found: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT name FROM sqlite_master
|
||||
WHERE type='index' AND name='idx_agents_dna_unique'",
|
||||
[],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.ok();
|
||||
assert_eq!(found.as_deref(), Some("idx_agents_dna_unique"));
|
||||
let v: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(v, schema::MIGRATIONS.len() as i64);
|
||||
// Third open for good measure.
|
||||
let _conn2 = ledger::open(&db).unwrap();
|
||||
}
|
||||
|
||||
/// v5-T5: multiple NULL DNAs are still accepted (SQLite default UNIQUE
|
||||
/// semantics). Matches the v2 "dna optional" contract for legacy callers.
|
||||
#[test]
|
||||
fn v5_allows_multiple_null_dnas() {
|
||||
let (_d, conn) = open_tmp();
|
||||
ledger::fork(&conn, "n1", "br-n1", None, "sh", None, None, None, None).unwrap();
|
||||
ledger::fork(&conn, "n2", "br-n2", None, "sh", None, None, None, None).unwrap();
|
||||
ledger::fork(&conn, "n3", "br-n3", None, "sh", None, None, None, None).unwrap();
|
||||
let all = ledger::list(&conn, None).unwrap();
|
||||
assert_eq!(all.len(), 3);
|
||||
assert!(all.iter().all(|r| r.dna.is_none()));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue