diff --git a/_primitives/_rust/kei-ledger/src/error.rs b/_primitives/_rust/kei-ledger/src/error.rs index 7eede42..c89aa95 100644 --- a/_primitives/_rust/kei-ledger/src/error.rs +++ b/_primitives/_rust/kei-ledger/src/error.rs @@ -11,7 +11,8 @@ use std::fmt; pub const MAX_TREE_DEPTH: usize = 1024; /// Errors from ledger ops that extend beyond raw SQL (tree walk + input -/// validation). Hot-path SQL calls still return `rusqlite::Error` directly. +/// validation + DNA uniqueness). Hot-path SQL calls still return +/// `rusqlite::Error` directly when no typed surface is required. #[derive(Debug)] pub enum LedgerError { Sql(rusqlite::Error), @@ -19,6 +20,20 @@ pub enum LedgerError { MaxDepthExceeded, /// Branch name longer than `MAX_BRANCH_LEN` chars (audit L1 cap). BranchTooLong { field: &'static str, len: usize }, + /// Attempted `fork` with a DNA that is already present in the ledger. + /// Caller decides whether to regenerate DNA with a fresh nonce — the + /// ledger never silently retries (v5, 2026-04-23). + DnaCollision { dna: String }, + /// v5 migration detected pre-existing duplicate DNAs in the agents + /// table. The UNIQUE index cannot be applied without data loss; the + /// operator must manually reconcile rows. `duplicates` lists each + /// offending DNA and its occurrence count. + /// + /// Resolution (back up the file first): run + /// `kei-ledger sql 'DELETE FROM agents WHERE rowid NOT IN + /// (SELECT MIN(rowid) FROM agents GROUP BY dna)'` + /// then re-open to retry migration. + DnaMigrationBlocked { duplicates: Vec<(String, usize)> }, } impl fmt::Display for LedgerError { @@ -33,6 +48,21 @@ impl fmt::Display for LedgerError { f, "{field} length {len} exceeds cap {MAX_BRANCH_LEN}" ), + LedgerError::DnaCollision { dna } => write!( + f, + "dna collision: {dna} already present — regenerate nonce and retry" + ), + LedgerError::DnaMigrationBlocked { duplicates } => { + write!( + f, + "v5 migration blocked: {} duplicate dna(s) in table; reconcile before reopen:", + duplicates.len() + )?; + for (dna, count) in duplicates { + write!(f, "\n {dna} ({count}x)")?; + } + Ok(()) + } } } } diff --git a/_primitives/_rust/kei-ledger/src/ledger.rs b/_primitives/_rust/kei-ledger/src/ledger.rs index 0937df9..b067b02 100644 --- a/_primitives/_rust/kei-ledger/src/ledger.rs +++ b/_primitives/_rust/kei-ledger/src/ledger.rs @@ -12,7 +12,11 @@ use std::collections::HashSet; use std::path::{Path, PathBuf}; /// Open or create the ledger file and run migrations. -pub fn open(path: &Path) -> SqlResult { +/// +/// Returns `LedgerError` rather than raw `rusqlite::Error` because v5 can +/// surface `DnaMigrationBlocked` (pre-existing duplicate DNAs). Legacy +/// rusqlite errors are wrapped in `LedgerError::Sql` via `From`. +pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } @@ -35,6 +39,10 @@ fn check_branch_lens(branch: &str, parent: Option<&str>) -> Result<(), LedgerErr } /// Insert running-agent row. Errors on duplicate id or branch > MAX_BRANCH_LEN. +/// +/// Surfaces `LedgerError::DnaCollision { dna }` when a non-NULL DNA already +/// exists in the ledger (v5 UNIQUE index). Caller regenerates DNA with a +/// fresh nonce and retries — the ledger never silently dedupes. #[allow(clippy::too_many_arguments)] pub fn fork( conn: &Connection, @@ -49,14 +57,31 @@ pub fn fork( ) -> Result<(), LedgerError> { check_branch_lens(branch, parent)?; let now = Utc::now().timestamp(); - conn.execute( + let res = conn.execute( "INSERT INTO agents (id, branch, parent_branch, spec_sha, status, started_ts, worktree_path, dna, creator_id, fork_parent_id) VALUES (?1, ?2, ?3, ?4, 'running', ?5, ?6, ?7, ?8, ?9)", params![id, branch, parent, spec_sha, now, worktree, dna, creator_id, fork_parent_id], - )?; - Ok(()) + ); + match res { + Ok(_) => Ok(()), + Err(e) => Err(classify_insert_error(e, dna)), + } +} + +/// Map rusqlite unique-constraint failures to typed variants. `agents.dna` +/// violations become `DnaCollision`; everything else (id collision, trigger +/// abort, etc.) flows through as `LedgerError::Sql`. +fn classify_insert_error(e: rusqlite::Error, dna: Option<&str>) -> LedgerError { + let msg = e.to_string(); + if msg.contains("agents.dna") || msg.contains("idx_agents_dna_unique") { + LedgerError::DnaCollision { + dna: dna.unwrap_or_default().to_string(), + } + } else { + LedgerError::Sql(e) + } } /// Mark a running agent as done. No-op if already in terminal state. diff --git a/_primitives/_rust/kei-ledger/src/schema.rs b/_primitives/_rust/kei-ledger/src/schema.rs index 8ed001f..b33b717 100644 --- a/_primitives/_rust/kei-ledger/src/schema.rs +++ b/_primitives/_rust/kei-ledger/src/schema.rs @@ -4,6 +4,7 @@ //! Single source of truth for table shape. Any structural change MUST //! bump the migration list below; existing rows are preserved. +use crate::error::LedgerError; use rusqlite::{Connection, Result}; /// Maximum length (chars) accepted for `branch` and `parent_branch` columns. @@ -57,27 +58,77 @@ pub const MIGRATIONS: &[&str] = &[ ALTER TABLE agents ADD COLUMN fork_parent_id TEXT; CREATE INDEX IF NOT EXISTS idx_agents_creator ON agents(creator_id); CREATE INDEX IF NOT EXISTS idx_agents_fork_parent ON agents(fork_parent_id);", + // v5 — UNIQUE constraint on dna (2026-04-23). Prevents silent duplicate + // rows when the 32-bit DNA nonce collides (~65k-agent birthday bound per + // role+caps+scope+body). Multiple NULLs are still accepted — SQLite's + // default index semantics treat NULLs as distinct, matching the v2 + // "dna optional" contract. Pre-check in `migrate()` aborts with + // `LedgerError::DnaMigrationBlocked` if existing rows already conflict, + // so the operator reconciles manually instead of losing data. + "CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_dna_unique ON agents(dna);", ]; +/// Schema version the v5 pre-check guards. Kept as a named constant so the +/// branch in `migrate()` stays self-documenting when future migrations land. +const V5_TARGET: i64 = 5; + /// Apply all pending migrations atomically (one transaction per version). /// /// Prior design ran `execute_batch` and bumped `user_version` in a separate /// call — a partial failure left the schema half-applied and wedged restarts. /// Now each version's DDL + the `user_version` bump share a transaction, so /// any error rolls everything back and the next startup retries cleanly. -pub fn migrate(conn: &Connection) -> Result<()> { +/// +/// The return type is `LedgerError` (not `rusqlite::Error`) because v5 +/// surfaces a typed `DnaMigrationBlocked` when pre-existing duplicates would +/// make the UNIQUE index creation fail — callers deserve a structured error, +/// not an opaque "UNIQUE constraint failed" from raw SQL. +pub fn migrate(conn: &Connection) -> std::result::Result<(), LedgerError> { let current: i64 = conn .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap_or(0); for (i, sql) in MIGRATIONS.iter().enumerate() { let target = (i + 1) as i64; if current < target { - apply_one(conn, sql, target)?; + if target == V5_TARGET { + precheck_dna_uniqueness(conn)?; + } + apply_one(conn, sql, target).map_err(LedgerError::Sql)?; } } Ok(()) } +/// v5 pre-check — scan existing rows for duplicate non-NULL DNAs. If any +/// exist, abort with `DnaMigrationBlocked` listing each offender and its +/// count. NULL DNAs are ignored because SQLite's default UNIQUE semantics +/// treat multiple NULLs as distinct (legacy pre-v2 rows stay valid). +fn precheck_dna_uniqueness(conn: &Connection) -> std::result::Result<(), LedgerError> { + let mut stmt = conn + .prepare( + "SELECT dna, COUNT(*) AS c FROM agents + WHERE dna IS NOT NULL + GROUP BY dna HAVING c > 1 + ORDER BY c DESC, dna ASC", + ) + .map_err(LedgerError::Sql)?; + let rows = stmt + .query_map([], |r| { + let dna: String = r.get(0)?; + let count: i64 = r.get(1)?; + Ok((dna, count as usize)) + }) + .map_err(LedgerError::Sql)?; + let duplicates: Vec<(String, usize)> = rows + .collect::>>() + .map_err(LedgerError::Sql)?; + if duplicates.is_empty() { + Ok(()) + } else { + Err(LedgerError::DnaMigrationBlocked { duplicates }) + } +} + /// Apply a single migration atomically: DDL + user_version bump in one txn. fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> { conn.execute_batch("BEGIN IMMEDIATE")?; diff --git a/_primitives/_rust/kei-ledger/tests/integration.rs b/_primitives/_rust/kei-ledger/tests/integration.rs index ecdcb8e..b30d150 100644 --- a/_primitives/_rust/kei-ledger/tests/integration.rs +++ b/_primitives/_rust/kei-ledger/tests/integration.rs @@ -334,7 +334,7 @@ fn migration_v4_idempotent_across_multiple_reopens() { let v: i64 = conn .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap(); - assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at v4"); + assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at latest"); } // Seed a row using v4 columns and verify it round-trips after another reopen. { @@ -349,3 +349,156 @@ fn migration_v4_idempotent_across_multiple_reopens() { assert_eq!(r.creator_id.as_deref(), Some("c")); assert_eq!(r.fork_parent_id.as_deref(), Some("fp")); } + +// --- v5 DNA UNIQUE constraint (2026-04-23) ------------------------------ + +/// v5-T1: migration v5 creates `idx_agents_dna_unique` on the agents table. +/// Detection via `sqlite_master` is the sqlite-canonical way to assert an +/// index exists independent of whether it has been used. +#[test] +fn v5_migration_adds_unique_index() { + let (_d, conn) = open_tmp(); + let found: Option = conn + .query_row( + "SELECT name FROM sqlite_master + WHERE type='index' AND name='idx_agents_dna_unique'", + [], + |r| r.get(0), + ) + .ok(); + assert_eq!(found.as_deref(), Some("idx_agents_dna_unique")); + // Schema version must have advanced to the migration list's length. + let v: i64 = conn + .query_row("PRAGMA user_version", [], |r| r.get(0)) + .unwrap(); + assert_eq!(v, schema::MIGRATIONS.len() as i64); +} + +/// v5-T2: a second `fork` with the same DNA is rejected with the typed +/// `DnaCollision` variant, not a raw SQL error. The payload preserves the +/// offending DNA so the caller can log / regenerate. +#[test] +fn duplicate_dna_rejected_at_fork() { + let (_d, conn) = open_tmp(); + let dna = "edit-local::NG-FW::ABCD::1234-xy01"; + ledger::fork(&conn, "a1", "agent/a1", None, "sh", None, Some(dna), None, None).unwrap(); + let res = ledger::fork(&conn, "a2", "agent/a2", None, "sh", None, Some(dna), None, None); + match res { + Err(ledger::LedgerError::DnaCollision { dna: got }) => { + assert_eq!(got, dna, "collision variant must carry the offending dna"); + } + other => panic!("expected DnaCollision, got {other:?}"), + } + // Non-dna uniqueness (primary-key id) must still flow through Sql — not + // mis-classified as a DNA collision. + let res2 = ledger::fork( + &conn, "a1", "agent/a1b", None, "sh", None, Some("other-dna"), None, None, + ); + assert!( + matches!(res2, Err(ledger::LedgerError::Sql(_))), + "duplicate id should stay a Sql error, got {res2:?}" + ); +} + +/// v5-T3: opening a freshly-built pre-v5 ledger that already contains +/// duplicate DNAs must surface `DnaMigrationBlocked` listing each offender. +/// Simulated by: (a) open with current schema to create table, (b) drop the +/// v5 UNIQUE index + rewind user_version to 4, (c) INSERT two conflicting +/// rows by hand, (d) reopen → pre-check fires before index recreation. +#[test] +fn v5_migration_detects_preexisting_duplicates() { + let dir = tempfile::tempdir().unwrap(); + let db = dir.path().join("ledger.sqlite"); + { + let conn = ledger::open(&db).unwrap(); + conn.execute("DROP INDEX IF EXISTS idx_agents_dna_unique", []).unwrap(); + conn.pragma_update(None, "user_version", 4_i64).unwrap(); + // Insert two rows with matching DNA directly — bypasses the v5 UNIQUE + // index since we just dropped it, reproducing the pre-v5 world where + // the 32-bit DNA nonce already collided for some operator. + let ts = 1_700_000_000_i64; + let dup = "duplicate-dna-abc"; + conn.execute( + "INSERT INTO agents + (id, branch, spec_sha, status, started_ts, dna) + VALUES (?1, ?2, ?3, 'running', ?4, ?5)", + rusqlite::params!["dup1", "br-dup1", "sha", ts, dup], + ) + .unwrap(); + conn.execute( + "INSERT INTO agents + (id, branch, spec_sha, status, started_ts, dna) + VALUES (?1, ?2, ?3, 'running', ?4, ?5)", + rusqlite::params!["dup2", "br-dup2", "sha", ts, dup], + ) + .unwrap(); + } + // Second open re-enters migrate(); pre-check must fire and block v5. + let res = ledger::open(&db); + match res { + Err(ledger::LedgerError::DnaMigrationBlocked { duplicates }) => { + assert_eq!(duplicates.len(), 1, "expected 1 dup group, got {duplicates:?}"); + assert_eq!(duplicates[0].0, "duplicate-dna-abc"); + assert_eq!(duplicates[0].1, 2); + } + other => panic!("expected DnaMigrationBlocked, got {other:?}"), + } + // Index must NOT have been created (migration aborted cleanly). + let conn_raw = rusqlite::Connection::open(&db).unwrap(); + let exists: Option = conn_raw + .query_row( + "SELECT name FROM sqlite_master + WHERE type='index' AND name='idx_agents_dna_unique'", + [], + |r| r.get(0), + ) + .ok(); + assert!(exists.is_none(), "v5 index must not have been applied"); + let v: i64 = conn_raw + .query_row("PRAGMA user_version", [], |r| r.get(0)) + .unwrap(); + assert_eq!(v, 4, "user_version must stay at v4 after blocked migration"); +} + +/// v5-T4: running migrations twice does not fail and leaves the index in +/// place. Complements the pre-existing `migrate_is_idempotent_across_reopens` +/// test with an assertion that specifically pins v5. +#[test] +fn migration_idempotent() { + let dir = tempfile::tempdir().unwrap(); + let db = dir.path().join("ledger.sqlite"); + // First open applies v1..v5. + { + let _conn = ledger::open(&db).unwrap(); + } + // Second open must be a no-op — no "index already exists" error. + let conn = ledger::open(&db).unwrap(); + let found: Option = conn + .query_row( + "SELECT name FROM sqlite_master + WHERE type='index' AND name='idx_agents_dna_unique'", + [], + |r| r.get(0), + ) + .ok(); + assert_eq!(found.as_deref(), Some("idx_agents_dna_unique")); + let v: i64 = conn + .query_row("PRAGMA user_version", [], |r| r.get(0)) + .unwrap(); + assert_eq!(v, schema::MIGRATIONS.len() as i64); + // Third open for good measure. + let _conn2 = ledger::open(&db).unwrap(); +} + +/// v5-T5: multiple NULL DNAs are still accepted (SQLite default UNIQUE +/// semantics). Matches the v2 "dna optional" contract for legacy callers. +#[test] +fn v5_allows_multiple_null_dnas() { + let (_d, conn) = open_tmp(); + ledger::fork(&conn, "n1", "br-n1", None, "sh", None, None, None, None).unwrap(); + ledger::fork(&conn, "n2", "br-n2", None, "sh", None, None, None, None).unwrap(); + ledger::fork(&conn, "n3", "br-n3", None, "sh", None, None, None, None).unwrap(); + let all = ledger::list(&conn, None).unwrap(); + assert_eq!(all.len(), 3); + assert!(all.iter().all(|r| r.dna.is_none())); +}