fix(ledger): DNA UNIQUE constraint + v5 migration (HIGH audit)

1 HIGH audit fix: agents.dna had no UNIQUE constraint → duplicate
DNAs accepted silently → kei-replay ambiguous.

- schema.rs: v5 migration adds CREATE UNIQUE INDEX idx_agents_dna_unique
  with pre-check. If existing DB has dna duplicates, migration surfaces
  typed LedgerError::DnaMigrationBlocked { duplicates: Vec<(String, usize)> }
  instead of silently dropping rows.

- error.rs: +DnaCollision { dna } (runtime insert violation at fork).
  Caller regenerates nonce and retries.
  +DnaMigrationBlocked (migration-time precheck failure) with
  resolution hint in doc-comment.

- ledger.rs: classify_insert_error distinguishes DNA-violation vs
  duplicate-id vs generic SQL error.

- NULL handling: multiple NULL dnas allowed (SQLite default UNIQUE
  semantics), preserves v2 "dna optional" contract.

+5 tests, 23/23 kei-ledger green.

Constructor Pattern: source files <=192 LOC, all functions <=25 LOC.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 15:45:43 +08:00
parent fc0f85a947
commit 3cc4279a54
4 changed files with 267 additions and 8 deletions

View file

@ -11,7 +11,8 @@ use std::fmt;
pub const MAX_TREE_DEPTH: usize = 1024;
/// Errors from ledger ops that extend beyond raw SQL (tree walk + input
/// validation). Hot-path SQL calls still return `rusqlite::Error` directly.
/// validation + DNA uniqueness). Hot-path SQL calls still return
/// `rusqlite::Error` directly when no typed surface is required.
#[derive(Debug)]
pub enum LedgerError {
Sql(rusqlite::Error),
@ -19,6 +20,20 @@ pub enum LedgerError {
MaxDepthExceeded,
/// Branch name longer than `MAX_BRANCH_LEN` chars (audit L1 cap).
BranchTooLong { field: &'static str, len: usize },
/// Attempted `fork` with a DNA that is already present in the ledger.
/// Caller decides whether to regenerate DNA with a fresh nonce — the
/// ledger never silently retries (v5, 2026-04-23).
DnaCollision { dna: String },
/// v5 migration detected pre-existing duplicate DNAs in the agents
/// table. The UNIQUE index cannot be applied without data loss; the
/// operator must manually reconcile rows. `duplicates` lists each
/// offending DNA and its occurrence count.
///
/// Resolution (back up the file first): run
/// `kei-ledger sql 'DELETE FROM agents WHERE rowid NOT IN
/// (SELECT MIN(rowid) FROM agents GROUP BY dna)'`
/// then re-open to retry migration.
DnaMigrationBlocked { duplicates: Vec<(String, usize)> },
}
impl fmt::Display for LedgerError {
@ -33,6 +48,21 @@ impl fmt::Display for LedgerError {
f,
"{field} length {len} exceeds cap {MAX_BRANCH_LEN}"
),
LedgerError::DnaCollision { dna } => write!(
f,
"dna collision: {dna} already present — regenerate nonce and retry"
),
LedgerError::DnaMigrationBlocked { duplicates } => {
write!(
f,
"v5 migration blocked: {} duplicate dna(s) in table; reconcile before reopen:",
duplicates.len()
)?;
for (dna, count) in duplicates {
write!(f, "\n {dna} ({count}x)")?;
}
Ok(())
}
}
}
}

View file

@ -12,7 +12,11 @@ use std::collections::HashSet;
use std::path::{Path, PathBuf};
/// Open or create the ledger file and run migrations.
pub fn open(path: &Path) -> SqlResult<Connection> {
///
/// Returns `LedgerError` rather than raw `rusqlite::Error` because v5 can
/// surface `DnaMigrationBlocked` (pre-existing duplicate DNAs). Legacy
/// rusqlite errors are wrapped in `LedgerError::Sql` via `From`.
pub fn open(path: &Path) -> Result<Connection, LedgerError> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
@ -35,6 +39,10 @@ fn check_branch_lens(branch: &str, parent: Option<&str>) -> Result<(), LedgerErr
}
/// Insert running-agent row. Errors on duplicate id or branch > MAX_BRANCH_LEN.
///
/// Surfaces `LedgerError::DnaCollision { dna }` when a non-NULL DNA already
/// exists in the ledger (v5 UNIQUE index). Caller regenerates DNA with a
/// fresh nonce and retries — the ledger never silently dedupes.
#[allow(clippy::too_many_arguments)]
pub fn fork(
conn: &Connection,
@ -49,14 +57,31 @@ pub fn fork(
) -> Result<(), LedgerError> {
check_branch_lens(branch, parent)?;
let now = Utc::now().timestamp();
conn.execute(
let res = conn.execute(
"INSERT INTO agents
(id, branch, parent_branch, spec_sha, status, started_ts,
worktree_path, dna, creator_id, fork_parent_id)
VALUES (?1, ?2, ?3, ?4, 'running', ?5, ?6, ?7, ?8, ?9)",
params![id, branch, parent, spec_sha, now, worktree, dna, creator_id, fork_parent_id],
)?;
Ok(())
);
match res {
Ok(_) => Ok(()),
Err(e) => Err(classify_insert_error(e, dna)),
}
}
/// Map rusqlite unique-constraint failures to typed variants. `agents.dna`
/// violations become `DnaCollision`; everything else (id collision, trigger
/// abort, etc.) flows through as `LedgerError::Sql`.
fn classify_insert_error(e: rusqlite::Error, dna: Option<&str>) -> LedgerError {
let msg = e.to_string();
if msg.contains("agents.dna") || msg.contains("idx_agents_dna_unique") {
LedgerError::DnaCollision {
dna: dna.unwrap_or_default().to_string(),
}
} else {
LedgerError::Sql(e)
}
}
/// Mark a running agent as done. No-op if already in terminal state.

View file

@ -4,6 +4,7 @@
//! Single source of truth for table shape. Any structural change MUST
//! bump the migration list below; existing rows are preserved.
use crate::error::LedgerError;
use rusqlite::{Connection, Result};
/// Maximum length (chars) accepted for `branch` and `parent_branch` columns.
@ -57,27 +58,77 @@ pub const MIGRATIONS: &[&str] = &[
ALTER TABLE agents ADD COLUMN fork_parent_id TEXT;
CREATE INDEX IF NOT EXISTS idx_agents_creator ON agents(creator_id);
CREATE INDEX IF NOT EXISTS idx_agents_fork_parent ON agents(fork_parent_id);",
// v5 — UNIQUE constraint on dna (2026-04-23). Prevents silent duplicate
// rows when the 32-bit DNA nonce collides (~65k-agent birthday bound per
// role+caps+scope+body). Multiple NULLs are still accepted — SQLite's
// default index semantics treat NULLs as distinct, matching the v2
// "dna optional" contract. Pre-check in `migrate()` aborts with
// `LedgerError::DnaMigrationBlocked` if existing rows already conflict,
// so the operator reconciles manually instead of losing data.
"CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_dna_unique ON agents(dna);",
];
/// Schema version the v5 pre-check guards. Kept as a named constant so the
/// branch in `migrate()` stays self-documenting when future migrations land.
const V5_TARGET: i64 = 5;
/// Apply all pending migrations atomically (one transaction per version).
///
/// Prior design ran `execute_batch` and bumped `user_version` in a separate
/// call — a partial failure left the schema half-applied and wedged restarts.
/// Now each version's DDL + the `user_version` bump share a transaction, so
/// any error rolls everything back and the next startup retries cleanly.
pub fn migrate(conn: &Connection) -> Result<()> {
///
/// The return type is `LedgerError` (not `rusqlite::Error`) because v5
/// surfaces a typed `DnaMigrationBlocked` when pre-existing duplicates would
/// make the UNIQUE index creation fail — callers deserve a structured error,
/// not an opaque "UNIQUE constraint failed" from raw SQL.
pub fn migrate(conn: &Connection) -> std::result::Result<(), LedgerError> {
let current: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
for (i, sql) in MIGRATIONS.iter().enumerate() {
let target = (i + 1) as i64;
if current < target {
apply_one(conn, sql, target)?;
if target == V5_TARGET {
precheck_dna_uniqueness(conn)?;
}
apply_one(conn, sql, target).map_err(LedgerError::Sql)?;
}
}
Ok(())
}
/// v5 pre-check — scan existing rows for duplicate non-NULL DNAs. If any
/// exist, abort with `DnaMigrationBlocked` listing each offender and its
/// count. NULL DNAs are ignored because SQLite's default UNIQUE semantics
/// treat multiple NULLs as distinct (legacy pre-v2 rows stay valid).
fn precheck_dna_uniqueness(conn: &Connection) -> std::result::Result<(), LedgerError> {
let mut stmt = conn
.prepare(
"SELECT dna, COUNT(*) AS c FROM agents
WHERE dna IS NOT NULL
GROUP BY dna HAVING c > 1
ORDER BY c DESC, dna ASC",
)
.map_err(LedgerError::Sql)?;
let rows = stmt
.query_map([], |r| {
let dna: String = r.get(0)?;
let count: i64 = r.get(1)?;
Ok((dna, count as usize))
})
.map_err(LedgerError::Sql)?;
let duplicates: Vec<(String, usize)> = rows
.collect::<Result<Vec<_>>>()
.map_err(LedgerError::Sql)?;
if duplicates.is_empty() {
Ok(())
} else {
Err(LedgerError::DnaMigrationBlocked { duplicates })
}
}
/// Apply a single migration atomically: DDL + user_version bump in one txn.
fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> {
conn.execute_batch("BEGIN IMMEDIATE")?;

View file

@ -334,7 +334,7 @@ fn migration_v4_idempotent_across_multiple_reopens() {
let v: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at v4");
assert_eq!(v, schema::MIGRATIONS.len() as i64, "schema must land at latest");
}
// Seed a row using v4 columns and verify it round-trips after another reopen.
{
@ -349,3 +349,156 @@ fn migration_v4_idempotent_across_multiple_reopens() {
assert_eq!(r.creator_id.as_deref(), Some("c"));
assert_eq!(r.fork_parent_id.as_deref(), Some("fp"));
}
// --- v5 DNA UNIQUE constraint (2026-04-23) ------------------------------
/// v5-T1: migration v5 creates `idx_agents_dna_unique` on the agents table.
/// Detection via `sqlite_master` is the sqlite-canonical way to assert an
/// index exists independent of whether it has been used.
#[test]
fn v5_migration_adds_unique_index() {
let (_d, conn) = open_tmp();
let found: Option<String> = conn
.query_row(
"SELECT name FROM sqlite_master
WHERE type='index' AND name='idx_agents_dna_unique'",
[],
|r| r.get(0),
)
.ok();
assert_eq!(found.as_deref(), Some("idx_agents_dna_unique"));
// Schema version must have advanced to the migration list's length.
let v: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, schema::MIGRATIONS.len() as i64);
}
/// v5-T2: a second `fork` with the same DNA is rejected with the typed
/// `DnaCollision` variant, not a raw SQL error. The payload preserves the
/// offending DNA so the caller can log / regenerate.
#[test]
fn duplicate_dna_rejected_at_fork() {
let (_d, conn) = open_tmp();
let dna = "edit-local::NG-FW::ABCD::1234-xy01";
ledger::fork(&conn, "a1", "agent/a1", None, "sh", None, Some(dna), None, None).unwrap();
let res = ledger::fork(&conn, "a2", "agent/a2", None, "sh", None, Some(dna), None, None);
match res {
Err(ledger::LedgerError::DnaCollision { dna: got }) => {
assert_eq!(got, dna, "collision variant must carry the offending dna");
}
other => panic!("expected DnaCollision, got {other:?}"),
}
// Non-dna uniqueness (primary-key id) must still flow through Sql — not
// mis-classified as a DNA collision.
let res2 = ledger::fork(
&conn, "a1", "agent/a1b", None, "sh", None, Some("other-dna"), None, None,
);
assert!(
matches!(res2, Err(ledger::LedgerError::Sql(_))),
"duplicate id should stay a Sql error, got {res2:?}"
);
}
/// v5-T3: opening a freshly-built pre-v5 ledger that already contains
/// duplicate DNAs must surface `DnaMigrationBlocked` listing each offender.
/// Simulated by: (a) open with current schema to create table, (b) drop the
/// v5 UNIQUE index + rewind user_version to 4, (c) INSERT two conflicting
/// rows by hand, (d) reopen → pre-check fires before index recreation.
#[test]
fn v5_migration_detects_preexisting_duplicates() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("ledger.sqlite");
{
let conn = ledger::open(&db).unwrap();
conn.execute("DROP INDEX IF EXISTS idx_agents_dna_unique", []).unwrap();
conn.pragma_update(None, "user_version", 4_i64).unwrap();
// Insert two rows with matching DNA directly — bypasses the v5 UNIQUE
// index since we just dropped it, reproducing the pre-v5 world where
// the 32-bit DNA nonce already collided for some operator.
let ts = 1_700_000_000_i64;
let dup = "duplicate-dna-abc";
conn.execute(
"INSERT INTO agents
(id, branch, spec_sha, status, started_ts, dna)
VALUES (?1, ?2, ?3, 'running', ?4, ?5)",
rusqlite::params!["dup1", "br-dup1", "sha", ts, dup],
)
.unwrap();
conn.execute(
"INSERT INTO agents
(id, branch, spec_sha, status, started_ts, dna)
VALUES (?1, ?2, ?3, 'running', ?4, ?5)",
rusqlite::params!["dup2", "br-dup2", "sha", ts, dup],
)
.unwrap();
}
// Second open re-enters migrate(); pre-check must fire and block v5.
let res = ledger::open(&db);
match res {
Err(ledger::LedgerError::DnaMigrationBlocked { duplicates }) => {
assert_eq!(duplicates.len(), 1, "expected 1 dup group, got {duplicates:?}");
assert_eq!(duplicates[0].0, "duplicate-dna-abc");
assert_eq!(duplicates[0].1, 2);
}
other => panic!("expected DnaMigrationBlocked, got {other:?}"),
}
// Index must NOT have been created (migration aborted cleanly).
let conn_raw = rusqlite::Connection::open(&db).unwrap();
let exists: Option<String> = conn_raw
.query_row(
"SELECT name FROM sqlite_master
WHERE type='index' AND name='idx_agents_dna_unique'",
[],
|r| r.get(0),
)
.ok();
assert!(exists.is_none(), "v5 index must not have been applied");
let v: i64 = conn_raw
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, 4, "user_version must stay at v4 after blocked migration");
}
/// v5-T4: running migrations twice does not fail and leaves the index in
/// place. Complements the pre-existing `migrate_is_idempotent_across_reopens`
/// test with an assertion that specifically pins v5.
#[test]
fn migration_idempotent() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("ledger.sqlite");
// First open applies v1..v5.
{
let _conn = ledger::open(&db).unwrap();
}
// Second open must be a no-op — no "index already exists" error.
let conn = ledger::open(&db).unwrap();
let found: Option<String> = conn
.query_row(
"SELECT name FROM sqlite_master
WHERE type='index' AND name='idx_agents_dna_unique'",
[],
|r| r.get(0),
)
.ok();
assert_eq!(found.as_deref(), Some("idx_agents_dna_unique"));
let v: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, schema::MIGRATIONS.len() as i64);
// Third open for good measure.
let _conn2 = ledger::open(&db).unwrap();
}
/// v5-T5: multiple NULL DNAs are still accepted (SQLite default UNIQUE
/// semantics). Matches the v2 "dna optional" contract for legacy callers.
#[test]
fn v5_allows_multiple_null_dnas() {
let (_d, conn) = open_tmp();
ledger::fork(&conn, "n1", "br-n1", None, "sh", None, None, None, None).unwrap();
ledger::fork(&conn, "n2", "br-n2", None, "sh", None, None, None, None).unwrap();
ledger::fork(&conn, "n3", "br-n3", None, "sh", None, None, None, None).unwrap();
let all = ledger::list(&conn, None).unwrap();
assert_eq!(all.len(), 3);
assert!(all.iter().all(|r| r.dna.is_none()));
}