Merge fix/b3-ledger — cycle + txn + length cap
This commit is contained in:
commit
a78f1aaa5f
5 changed files with 257 additions and 47 deletions
46
_primitives/_rust/kei-ledger/src/error.rs
Normal file
46
_primitives/_rust/kei-ledger/src/error.rs
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
//! Error type for ledger operations that extend beyond raw SQL.
|
||||
//!
|
||||
//! Constructor Pattern: one cube = one error type + its three trait impls.
|
||||
//! Kept as a separate module so `ledger.rs` stays under the 200-LOC cap.
|
||||
|
||||
use crate::schema::MAX_BRANCH_LEN;
|
||||
use std::fmt;
|
||||
|
||||
/// Maximum depth walked by `ledger::tree()` before aborting with
|
||||
/// `LedgerError::MaxDepthExceeded`. Guards against cyclic or runaway data.
|
||||
pub const MAX_TREE_DEPTH: usize = 1024;
|
||||
|
||||
/// Errors from ledger ops that extend beyond raw SQL (tree walk + input
|
||||
/// validation). Hot-path SQL calls still return `rusqlite::Error` directly.
|
||||
#[derive(Debug)]
|
||||
pub enum LedgerError {
|
||||
Sql(rusqlite::Error),
|
||||
/// BFS in `tree()` exceeded `MAX_TREE_DEPTH` iterations.
|
||||
MaxDepthExceeded,
|
||||
/// Branch name longer than `MAX_BRANCH_LEN` chars (audit L1 cap).
|
||||
BranchTooLong { field: &'static str, len: usize },
|
||||
}
|
||||
|
||||
impl fmt::Display for LedgerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
LedgerError::Sql(e) => write!(f, "sql: {e}"),
|
||||
LedgerError::MaxDepthExceeded => write!(
|
||||
f,
|
||||
"tree walk exceeded {MAX_TREE_DEPTH} iterations (cycle or runaway ledger)"
|
||||
),
|
||||
LedgerError::BranchTooLong { field, len } => write!(
|
||||
f,
|
||||
"{field} length {len} exceeds cap {MAX_BRANCH_LEN}"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for LedgerError {}
|
||||
|
||||
impl From<rusqlite::Error> for LedgerError {
|
||||
fn from(e: rusqlite::Error) -> Self {
|
||||
LedgerError::Sql(e)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,13 @@
|
|||
//! Ledger operations — fork / done / fail / list / tree / validate.
|
||||
//!
|
||||
//! Constructor Pattern: each public fn <30 LOC, single responsibility.
|
||||
//! Storage: rusqlite Connection (bundled SQLite). One file per caller.
|
||||
//! Ledger ops — fork / done / fail / list / tree / validate.
|
||||
//! Constructor Pattern: each public fn <30 LOC. rusqlite-backed, one file per caller.
|
||||
|
||||
use crate::schema::{migrate, REQUIRED_ARTEFACTS};
|
||||
use crate::error::MAX_TREE_DEPTH;
|
||||
use crate::schema::{migrate, MAX_BRANCH_LEN, REQUIRED_ARTEFACTS};
|
||||
pub use crate::error::LedgerError;
|
||||
use chrono::Utc;
|
||||
use rusqlite::{params, Connection, OptionalExtension, Result as SqlResult};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
|
|
@ -20,8 +21,7 @@ pub struct AgentRow {
|
|||
pub finished_ts: Option<i64>,
|
||||
pub summary: Option<String>,
|
||||
pub worktree_path: Option<String>,
|
||||
/// Layer G composition fingerprint; `None` for rows written by pre-v2
|
||||
/// clients or fork calls that didn't pass `--dna`.
|
||||
/// Layer G composition fingerprint; `None` for pre-v2 rows.
|
||||
pub dna: Option<String>,
|
||||
}
|
||||
|
||||
|
|
@ -35,8 +35,20 @@ pub fn open(path: &Path) -> SqlResult<Connection> {
|
|||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Insert a new running-agent row. Errors if id is already present.
|
||||
/// `dna` (Layer G) is optional; callers on the old CLI path pass `None`.
|
||||
/// Cap branch / parent_branch length (audit L1). Schema triggers mirror this.
|
||||
fn check_branch_lens(branch: &str, parent: Option<&str>) -> Result<(), LedgerError> {
|
||||
if branch.len() > MAX_BRANCH_LEN {
|
||||
return Err(LedgerError::BranchTooLong { field: "branch", len: branch.len() });
|
||||
}
|
||||
if let Some(p) = parent {
|
||||
if p.len() > MAX_BRANCH_LEN {
|
||||
return Err(LedgerError::BranchTooLong { field: "parent_branch", len: p.len() });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert running-agent row. Errors on duplicate id or branch > MAX_BRANCH_LEN.
|
||||
pub fn fork(
|
||||
conn: &Connection,
|
||||
id: &str,
|
||||
|
|
@ -45,7 +57,8 @@ pub fn fork(
|
|||
spec_sha: &str,
|
||||
worktree: Option<&str>,
|
||||
dna: Option<&str>,
|
||||
) -> SqlResult<()> {
|
||||
) -> Result<(), LedgerError> {
|
||||
check_branch_lens(branch, parent)?;
|
||||
let now = Utc::now().timestamp();
|
||||
conn.execute(
|
||||
"INSERT INTO agents
|
||||
|
|
@ -86,23 +99,23 @@ pub fn merged(conn: &Connection, id: &str) -> SqlResult<usize> {
|
|||
)
|
||||
}
|
||||
|
||||
/// Column list shared by all SELECTs that hydrate an `AgentRow`.
|
||||
const SELECT_COLS: &str =
|
||||
"id, branch, parent_branch, spec_sha, status, started_ts, finished_ts, summary, worktree_path, dna";
|
||||
|
||||
/// List all agents, optionally filtered by status.
|
||||
pub fn list(conn: &Connection, status: Option<&str>) -> SqlResult<Vec<AgentRow>> {
|
||||
let (sql, bound): (&str, Vec<String>) = match status {
|
||||
let (sql, bound): (String, Vec<String>) = match status {
|
||||
Some(s) => (
|
||||
"SELECT id, branch, parent_branch, spec_sha, status, started_ts,
|
||||
finished_ts, summary, worktree_path, dna
|
||||
FROM agents WHERE status = ?1 ORDER BY started_ts DESC",
|
||||
format!("SELECT {SELECT_COLS} FROM agents WHERE status = ?1 ORDER BY started_ts DESC"),
|
||||
vec![s.to_string()],
|
||||
),
|
||||
None => (
|
||||
"SELECT id, branch, parent_branch, spec_sha, status, started_ts,
|
||||
finished_ts, summary, worktree_path, dna
|
||||
FROM agents ORDER BY started_ts DESC",
|
||||
format!("SELECT {SELECT_COLS} FROM agents ORDER BY started_ts DESC"),
|
||||
vec![],
|
||||
),
|
||||
};
|
||||
let mut stmt = conn.prepare(sql)?;
|
||||
let mut stmt = conn.prepare(&sql)?;
|
||||
let rows = stmt
|
||||
.query_map(rusqlite::params_from_iter(bound.iter()), row_to_agent)?
|
||||
.collect::<SqlResult<Vec<_>>>()?;
|
||||
|
|
@ -125,44 +138,51 @@ fn row_to_agent(r: &rusqlite::Row) -> SqlResult<AgentRow> {
|
|||
}
|
||||
|
||||
fn by_id(conn: &Connection, id: &str) -> SqlResult<Option<AgentRow>> {
|
||||
conn.query_row(
|
||||
"SELECT id, branch, parent_branch, spec_sha, status, started_ts,
|
||||
finished_ts, summary, worktree_path, dna
|
||||
FROM agents WHERE id = ?1",
|
||||
params![id],
|
||||
row_to_agent,
|
||||
)
|
||||
.optional()
|
||||
let sql = format!("SELECT {SELECT_COLS} FROM agents WHERE id = ?1");
|
||||
conn.query_row(&sql, params![id], row_to_agent).optional()
|
||||
}
|
||||
|
||||
/// Walk the parent chain from `root_id` down to all descendants.
|
||||
/// Returns rows in BFS order starting with root.
|
||||
pub fn tree(conn: &Connection, root_id: &str) -> SqlResult<Vec<AgentRow>> {
|
||||
/// Fetch immediate children of a given parent_branch.
|
||||
fn children_of(conn: &Connection, parent_branch: &str) -> SqlResult<Vec<AgentRow>> {
|
||||
let sql = format!(
|
||||
"SELECT {SELECT_COLS} FROM agents WHERE parent_branch = ?1 ORDER BY started_ts ASC"
|
||||
);
|
||||
let mut stmt = conn.prepare(&sql)?;
|
||||
let rows = stmt
|
||||
.query_map(params![parent_branch], row_to_agent)?
|
||||
.collect::<SqlResult<Vec<_>>>()?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
/// BFS from `root_id` to all descendants, root-first. Cycle-safe via `visited`;
|
||||
/// aborts after `MAX_TREE_DEPTH` iterations (audit S2 runaway-data guard).
|
||||
pub fn tree(conn: &Connection, root_id: &str) -> Result<Vec<AgentRow>, LedgerError> {
|
||||
let root = match by_id(conn, root_id)? {
|
||||
Some(r) => r,
|
||||
None => return Ok(vec![]),
|
||||
};
|
||||
let mut visited: HashSet<String> = HashSet::new();
|
||||
visited.insert(root.branch.clone());
|
||||
let mut out = vec![root.clone()];
|
||||
let mut frontier = vec![root.branch];
|
||||
let mut frontier: Vec<String> = vec![root.branch];
|
||||
let mut steps = 0usize;
|
||||
while let Some(parent_branch) = frontier.pop() {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, branch, parent_branch, spec_sha, status, started_ts,
|
||||
finished_ts, summary, worktree_path, dna
|
||||
FROM agents WHERE parent_branch = ?1 ORDER BY started_ts ASC",
|
||||
)?;
|
||||
let kids = stmt
|
||||
.query_map(params![parent_branch], row_to_agent)?
|
||||
.collect::<SqlResult<Vec<_>>>()?;
|
||||
for k in kids {
|
||||
frontier.push(k.branch.clone());
|
||||
out.push(k);
|
||||
steps += 1;
|
||||
if steps > MAX_TREE_DEPTH {
|
||||
return Err(LedgerError::MaxDepthExceeded);
|
||||
}
|
||||
for k in children_of(conn, &parent_branch)? {
|
||||
if visited.insert(k.branch.clone()) {
|
||||
frontier.push(k.branch.clone());
|
||||
out.push(k);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Verify all 6 required artefacts exist under `.claude/agents/<id>/`
|
||||
/// rooted at `repo_root`. Returns list of missing artefacts (empty = OK).
|
||||
/// Verify all 6 required artefacts exist under `.claude/agents/<id>/`.
|
||||
/// Returns list of missing filenames (empty = OK).
|
||||
pub fn validate(repo_root: &Path, agent_id: &str) -> Vec<String> {
|
||||
let mut base: PathBuf = repo_root.to_path_buf();
|
||||
base.push(".claude");
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
//! Single responsibility: parse args, dispatch to ledger ops, format output.
|
||||
//! Storage: `~/.claude/agents/ledger.sqlite` (or $KEI_LEDGER_DB override).
|
||||
|
||||
mod error;
|
||||
mod ledger;
|
||||
mod schema;
|
||||
|
||||
|
|
@ -27,8 +28,11 @@ enum Cmd {
|
|||
/// Log a new running agent.
|
||||
Fork {
|
||||
id: String,
|
||||
/// Branch name (<=256 chars).
|
||||
#[arg(value_parser = parse_branch)]
|
||||
branch: String,
|
||||
#[arg(long)]
|
||||
/// Parent branch (<=256 chars).
|
||||
#[arg(long, value_parser = parse_branch)]
|
||||
parent: Option<String>,
|
||||
#[arg(long)]
|
||||
spec_sha: String,
|
||||
|
|
@ -67,6 +71,14 @@ enum Cmd {
|
|||
},
|
||||
}
|
||||
|
||||
/// clap value_parser caps branch/parent length at MAX_BRANCH_LEN (audit L1).
|
||||
fn parse_branch(s: &str) -> Result<String, String> {
|
||||
if s.len() > schema::MAX_BRANCH_LEN {
|
||||
return Err(format!("branch length {} exceeds cap {}", s.len(), schema::MAX_BRANCH_LEN));
|
||||
}
|
||||
Ok(s.to_string())
|
||||
}
|
||||
|
||||
fn db_path(cli_db: Option<PathBuf>) -> PathBuf {
|
||||
if let Some(p) = cli_db {
|
||||
return p;
|
||||
|
|
|
|||
|
|
@ -6,6 +6,10 @@
|
|||
|
||||
use rusqlite::{Connection, Result};
|
||||
|
||||
/// Maximum length (chars) accepted for `branch` and `parent_branch` columns.
|
||||
/// Enforced by SQL CHECK (v3 migration) and CLI `value_parser` length cap.
|
||||
pub const MAX_BRANCH_LEN: usize = 256;
|
||||
|
||||
/// Ordered migrations. Index = schema version. Never reorder; append only.
|
||||
pub const MIGRATIONS: &[&str] = &[
|
||||
// v1 — initial schema (RULE 0.12, 2026-04-21)
|
||||
|
|
@ -25,9 +29,34 @@ pub const MIGRATIONS: &[&str] = &[
|
|||
// v2 — Layer G DNA identity column + prefix index (2026-04-23)
|
||||
"ALTER TABLE agents ADD COLUMN dna TEXT;
|
||||
CREATE INDEX IF NOT EXISTS idx_agents_dna_prefix ON agents(substr(dna, 1, 30));",
|
||||
// v3 — length caps on branch/parent_branch (audit L1, 2026-04-23)
|
||||
// Enforced via trigger rather than table CHECK because CHECK cannot be
|
||||
// added retroactively to an existing table without rebuilding it. The
|
||||
// triggers refuse inserts / updates with over-long values.
|
||||
"CREATE TRIGGER IF NOT EXISTS trg_agents_branch_len_ins
|
||||
BEFORE INSERT ON agents
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'branch length exceeds 256')
|
||||
WHERE length(NEW.branch) > 256;
|
||||
SELECT RAISE(ABORT, 'parent_branch length exceeds 256')
|
||||
WHERE NEW.parent_branch IS NOT NULL AND length(NEW.parent_branch) > 256;
|
||||
END;
|
||||
CREATE TRIGGER IF NOT EXISTS trg_agents_branch_len_upd
|
||||
BEFORE UPDATE OF branch, parent_branch ON agents
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'branch length exceeds 256')
|
||||
WHERE length(NEW.branch) > 256;
|
||||
SELECT RAISE(ABORT, 'parent_branch length exceeds 256')
|
||||
WHERE NEW.parent_branch IS NOT NULL AND length(NEW.parent_branch) > 256;
|
||||
END;",
|
||||
];
|
||||
|
||||
/// Apply all pending migrations. Stores current version in pragma user_version.
|
||||
/// Apply all pending migrations atomically (one transaction per version).
|
||||
///
|
||||
/// Prior design ran `execute_batch` and bumped `user_version` in a separate
|
||||
/// call — a partial failure left the schema half-applied and wedged restarts.
|
||||
/// Now each version's DDL + the `user_version` bump share a transaction, so
|
||||
/// any error rolls everything back and the next startup retries cleanly.
|
||||
pub fn migrate(conn: &Connection) -> Result<()> {
|
||||
let current: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
|
|
@ -35,13 +64,29 @@ pub fn migrate(conn: &Connection) -> Result<()> {
|
|||
for (i, sql) in MIGRATIONS.iter().enumerate() {
|
||||
let target = (i + 1) as i64;
|
||||
if current < target {
|
||||
conn.execute_batch(sql)?;
|
||||
conn.pragma_update(None, "user_version", target)?;
|
||||
apply_one(conn, sql, target)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply a single migration atomically: DDL + user_version bump in one txn.
|
||||
fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> {
|
||||
conn.execute_batch("BEGIN IMMEDIATE")?;
|
||||
let step = (|| -> Result<()> {
|
||||
conn.execute_batch(sql)?;
|
||||
conn.pragma_update(None, "user_version", target)?;
|
||||
Ok(())
|
||||
})();
|
||||
match step {
|
||||
Ok(()) => conn.execute_batch("COMMIT"),
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Six required artefacts per agent (RULE 0.12 §completion bundle).
|
||||
pub const REQUIRED_ARTEFACTS: &[&str] = &[
|
||||
"spec.md",
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@
|
|||
|
||||
#[path = "../src/schema.rs"]
|
||||
mod schema;
|
||||
#[path = "../src/error.rs"]
|
||||
mod error;
|
||||
#[path = "../src/ledger.rs"]
|
||||
mod ledger;
|
||||
|
||||
|
|
@ -160,3 +162,88 @@ fn merged_after_done_transitions_status() {
|
|||
assert_eq!(merged.len(), 1);
|
||||
assert_eq!(merged[0].summary.as_deref(), Some("ready"));
|
||||
}
|
||||
|
||||
// --- audit fixes (2026-04-23) ------------------------------------------
|
||||
|
||||
/// Fix S2 — cycle in parent_branch must not hang `tree()`. Synthetic cycle
|
||||
/// br-x→br-y→br-x is injected by disabling the check trigger temporarily
|
||||
/// via raw INSERT (bypassing `ledger::fork`'s length guard is not needed;
|
||||
/// the cycle itself is the payload). The walk must terminate with either
|
||||
/// `MaxDepthExceeded` OR cleanly (visited-set short-circuit), never loop.
|
||||
#[test]
|
||||
fn tree_handles_cycle_without_infinite_loop() {
|
||||
let (_d, conn) = open_tmp();
|
||||
// Two rows whose parent_branch point at each other.
|
||||
ledger::fork(&conn, "cx", "br-x", Some("br-y"), "sha-x", None, None).unwrap();
|
||||
ledger::fork(&conn, "cy", "br-y", Some("br-x"), "sha-y", None, None).unwrap();
|
||||
|
||||
// tree() should either return bounded rows (visited-set kills the loop)
|
||||
// or MaxDepthExceeded. Must not hang / OOM.
|
||||
let out = ledger::tree(&conn, "cx");
|
||||
match out {
|
||||
Ok(rows) => {
|
||||
// visited-set: cx root, plus cy as child of br-y's... actually cx's
|
||||
// branch is br-x, children of br-x = cy; cy's branch is br-y,
|
||||
// already visited (root chained via frontier pop). <= 2 rows max.
|
||||
assert!(rows.len() <= 2, "got unbounded rows {}", rows.len());
|
||||
}
|
||||
Err(ledger::LedgerError::MaxDepthExceeded) => {
|
||||
// Acceptable: circuit breaker fired.
|
||||
}
|
||||
Err(e) => panic!("unexpected error: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Fix M2 — migration is idempotent: calling `open` twice on the same file
|
||||
/// does not explode with "duplicate column" or leave user_version stale.
|
||||
/// This implicitly exercises the transaction wrapper (v1, v2, v3 must all
|
||||
/// commit cleanly across two opens).
|
||||
#[test]
|
||||
fn migrate_is_idempotent_across_reopens() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db = dir.path().join("ledger.sqlite");
|
||||
{
|
||||
let conn = ledger::open(&db).unwrap();
|
||||
ledger::fork(&conn, "pre", "br-pre", None, "h", None, None).unwrap();
|
||||
}
|
||||
// Second open re-enters migrate(); must be a no-op, not a duplicate
|
||||
// column / trigger error.
|
||||
let conn = ledger::open(&db).unwrap();
|
||||
let version: i64 = conn
|
||||
.query_row("PRAGMA user_version", [], |r| r.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(version, schema::MIGRATIONS.len() as i64);
|
||||
// Row from first session must survive.
|
||||
let rows = ledger::list(&conn, None).unwrap();
|
||||
assert_eq!(rows.len(), 1);
|
||||
assert_eq!(rows[0].id, "pre");
|
||||
}
|
||||
|
||||
/// Fix L1 — branch longer than MAX_BRANCH_LEN must be rejected at the
|
||||
/// library boundary with `LedgerError::BranchTooLong` (clap `value_parser`
|
||||
/// provides the same guard at the CLI boundary).
|
||||
#[test]
|
||||
fn fork_rejects_overlong_branch() {
|
||||
let (_d, conn) = open_tmp();
|
||||
let long = "x".repeat(schema::MAX_BRANCH_LEN + 1);
|
||||
let res = ledger::fork(&conn, "too-long", &long, None, "h", None, None);
|
||||
match res {
|
||||
Err(ledger::LedgerError::BranchTooLong { field, len }) => {
|
||||
assert_eq!(field, "branch");
|
||||
assert_eq!(len, schema::MAX_BRANCH_LEN + 1);
|
||||
}
|
||||
other => panic!("expected BranchTooLong, got {other:?}"),
|
||||
}
|
||||
// Parent side same cap.
|
||||
let res2 = ledger::fork(&conn, "ok-br", "fine", Some(&long), "h", None, None);
|
||||
assert!(
|
||||
matches!(
|
||||
res2,
|
||||
Err(ledger::LedgerError::BranchTooLong { field: "parent_branch", .. })
|
||||
),
|
||||
"expected parent_branch rejection"
|
||||
);
|
||||
// Length at the cap is accepted.
|
||||
let at_cap = "y".repeat(schema::MAX_BRANCH_LEN);
|
||||
ledger::fork(&conn, "at-cap", &at_cap, None, "h", None, None).unwrap();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue