feat(w9e): NEW kei-replay crate — reconstruct spawn from DNA

kei-replay <dna> parses DNA, looks up ledger row, loads task.toml
from worktree, re-runs compose_prompt, recomputes body hash, reports
match/drift.

kei-replay diff <dna-1> <dna-2> flags every changed facet between
two DNAs.

6 cubes (main/lib/replay/diff/ledger_lookup), all ≤114 LOC.

Direct SQLite access in ledger_lookup.rs (kei-ledger has no lib.rs).
v4 schema-compatible (reads id/dna/worktree_path/spec_sha).

Tests: 6/6 (≥4 required): happy path, missing DNA, drift detection,
diff differing bodies, diff identical, explicit --task override.

Workspace Cargo.toml: +kei-replay member.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 13:34:16 +08:00
parent e0e6e1720d
commit 38ceab0913
9 changed files with 630 additions and 0 deletions

View file

@ -2129,6 +2129,20 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-replay"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"kei-agent-runtime",
"rusqlite",
"sha2 0.10.9",
"tempfile",
"thiserror 1.0.69",
"toml",
]
[[package]]
name = "kei-router"
version = "0.1.0"

View file

@ -49,6 +49,8 @@ members = [
"kei-cache",
# agent substrate v1 — automation envelope: prepare + ledger fork + verify
"kei-spawn",
# agent substrate v1 — reconstruct spawn from DNA (ledger row + task.toml + recompose)
"kei-replay",
]
[workspace.package]

View file

@ -0,0 +1,30 @@
[package]
name = "kei-replay"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Reconstruct agent spawn from DNA — reads ledger row + task.toml, re-composes, detects drift"
[[bin]]
name = "kei-replay"
path = "src/main.rs"
[lib]
name = "kei_replay"
path = "src/lib.rs"
[dependencies]
kei-agent-runtime = { path = "../kei-agent-runtime" }
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { workspace = true }
anyhow = "1"
thiserror = "1"
sha2 = { workspace = true }
[dev-dependencies]
tempfile = "3"
toml = "0.8"
[package.metadata.keisei]
backend = "none"
description = "Reconstruct agent spawn from DNA — replay / verify / diff"

View file

@ -0,0 +1,73 @@
//! Diff — compare two DNAs facet-by-facet.
//!
//! Pure parser + comparator. No I/O, no ledger lookup. Callers that want
//! the composed-body text diff can run `replay` on each DNA first and diff
//! the resulting `composed_prompt` themselves.
use anyhow::{anyhow, Result};
use kei_agent_runtime::dna::Dna;
/// Diff report between two DNA strings.
#[derive(Debug, Clone)]
pub struct DnaDiff {
pub left: Dna,
pub right: Dna,
pub role_changed: bool,
pub caps_changed: bool,
pub scope_changed: bool,
pub body_changed: bool,
pub nonce_changed: bool,
}
impl DnaDiff {
/// `true` when every facet is identical (same composition, same nonce).
pub fn is_identical(&self) -> bool {
!(self.role_changed
|| self.caps_changed
|| self.scope_changed
|| self.body_changed
|| self.nonce_changed)
}
/// `true` when the two DNAs would re-compose to the same prompt body
/// (nonce difference allowed — nonces are per-invocation salt).
pub fn is_same_composition(&self) -> bool {
!(self.role_changed || self.caps_changed || self.scope_changed || self.body_changed)
}
/// Human-readable multi-line report.
pub fn render(&self) -> String {
let mut out = Vec::with_capacity(7);
out.push(format!("left : {}", self.left.render()));
out.push(format!("right: {}", self.right.render()));
out.push(format!("role : {}", flag(self.role_changed)));
out.push(format!("caps : {}", flag(self.caps_changed)));
out.push(format!("scope : {}", flag(self.scope_changed)));
out.push(format!("body : {}", flag(self.body_changed)));
out.push(format!("nonce : {}", flag(self.nonce_changed)));
out.join("\n")
}
}
fn flag(changed: bool) -> &'static str {
if changed {
"CHANGED"
} else {
"same"
}
}
/// Parse both DNAs and emit the facet-level diff.
pub fn diff(left: &str, right: &str) -> Result<DnaDiff> {
let l = Dna::parse(left).map_err(|e| anyhow!("invalid left DNA: {e}"))?;
let r = Dna::parse(right).map_err(|e| anyhow!("invalid right DNA: {e}"))?;
Ok(DnaDiff {
role_changed: l.role != r.role,
caps_changed: l.caps_bitmap != r.caps_bitmap,
scope_changed: l.scope_hash != r.scope_hash,
body_changed: l.body_hash != r.body_hash,
nonce_changed: l.nonce != r.nonce,
left: l,
right: r,
})
}

View file

@ -0,0 +1,54 @@
//! Direct SQLite read of the kei-ledger DB to resolve DNA → ledger row.
//!
//! kei-ledger ships as a binary-only crate (no lib target), so we query
//! its SQLite file directly. The DB path follows the same fallback order
//! used by the ledger binary itself.
use anyhow::{anyhow, Context, Result};
use rusqlite::{params, Connection, OptionalExtension};
use std::path::PathBuf;
/// Resolved ledger row subset that kei-replay needs.
#[derive(Debug, Clone)]
pub struct LedgerHit {
pub id: String,
pub dna: String,
pub worktree_path: Option<String>,
pub spec_sha: String,
}
/// DB path fallback: `$KEI_LEDGER_DB` env → `$HOME/.claude/agents/ledger.sqlite`.
pub fn default_db_path() -> PathBuf {
if let Ok(env) = std::env::var("KEI_LEDGER_DB") {
return PathBuf::from(env);
}
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
PathBuf::from(home).join(".claude/agents/ledger.sqlite")
}
/// Look up a row whose `dna` column exactly matches the given string.
///
/// Returns `None` if no row matches. Errors on DB access failure.
pub fn find_by_dna(db_path: &std::path::Path, dna: &str) -> Result<Option<LedgerHit>> {
let conn = Connection::open(db_path)
.with_context(|| format!("open ledger DB {}", db_path.display()))?;
let sql = "SELECT id, dna, worktree_path, spec_sha FROM agents WHERE dna = ?1 LIMIT 1";
let row = conn
.query_row(sql, params![dna], |r| {
Ok(LedgerHit {
id: r.get(0)?,
dna: r.get::<_, Option<String>>(1)?.unwrap_or_default(),
worktree_path: r.get(2)?,
spec_sha: r.get(3)?,
})
})
.optional()
.with_context(|| format!("query DNA {dna} from {}", db_path.display()))?;
Ok(row)
}
/// Resolve DNA → hit, or a well-typed error if the DNA isn't in the ledger.
pub fn require_by_dna(db_path: &std::path::Path, dna: &str) -> Result<LedgerHit> {
find_by_dna(db_path, dna)?
.ok_or_else(|| anyhow!("DNA `{dna}` not found in ledger {}", db_path.display()))
}

View file

@ -0,0 +1,18 @@
//! kei-replay — reconstruct an agent spawn from its DNA string.
//!
//! Given a DNA `role::caps::scope::body-nonce`, look up the ledger row,
//! locate the archived `task.toml` for that agent, re-run the compose
//! pipeline, and compare the resulting body hash to the DNA's body segment.
//! A mismatch is schema drift since the original spawn.
//!
//! Constructor Pattern: one responsibility per cube. No I/O beyond SQLite
//! read + `std::fs` on task files + stdout.
//!
//! Modules:
//! - `replay` — reconstruct composed prompt from DNA
//! - `diff` — compare two DNAs (facets + bodies)
//! - `ledger_lookup` — SQLite direct read of ledger rows by DNA
pub mod diff;
pub mod ledger_lookup;
pub mod replay;

View file

@ -0,0 +1,114 @@
//! kei-replay — CLI dispatcher.
//!
//! Commands:
//! kei-replay <dna> — reconstruct; print task.toml + prompt
//! kei-replay <dna> --verify — also fail non-zero on body-hash drift
//! kei-replay diff <a> <b> — compare two DNAs, print facet report
use clap::{Parser, Subcommand};
use kei_replay::{diff, ledger_lookup, replay};
use std::path::PathBuf;
use std::process::ExitCode;
#[derive(Parser)]
#[command(
name = "kei-replay",
version,
about = "Reconstruct agent spawn from DNA — replay / verify / diff"
)]
struct Cli {
/// Override ledger DB path (default: $KEI_LEDGER_DB or ~/.claude/agents/ledger.sqlite)
#[arg(long, global = true)]
db: Option<PathBuf>,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Reconstruct the spawn for a DNA string.
Replay {
/// DNA string: role::caps::scope::body-nonce
dna: String,
/// Repo root holding _roles/ and _capabilities/ (default: cwd)
#[arg(long)]
kit_root: Option<PathBuf>,
/// Explicit task.toml path (skips ledger lookup for the file path)
#[arg(long)]
task: Option<PathBuf>,
/// Fail with exit 2 when recomputed body hash differs from DNA.
#[arg(long)]
verify: bool,
},
/// Diff two DNA strings facet-by-facet.
Diff { left: String, right: String },
}
fn main() -> ExitCode {
let cli = Cli::parse();
match cli.cmd {
Cmd::Replay { dna, kit_root, task, verify } => {
run_replay(cli.db, dna, kit_root, task, verify)
}
Cmd::Diff { left, right } => run_diff(left, right),
}
}
fn run_replay(
db_cli: Option<PathBuf>,
dna: String,
kit_root: Option<PathBuf>,
task: Option<PathBuf>,
verify: bool,
) -> ExitCode {
let db = db_cli.unwrap_or_else(ledger_lookup::default_db_path);
let kit = kit_root.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| ".".into()));
let result = replay::replay(&db, &dna, task.as_deref(), &kit);
let r = match result {
Ok(r) => r,
Err(e) => {
eprintln!("replay failed: {e}");
return ExitCode::from(1);
}
};
print_replay(&r);
if verify && !r.body_hash_matches {
eprintln!(
"DRIFT: DNA body_hash={} but recomputed={} — task.toml differs from original spawn",
r.dna.body_hash, r.recomputed_body_hash
);
return ExitCode::from(2);
}
ExitCode::SUCCESS
}
fn print_replay(r: &replay::Replay) {
println!("=== task.toml ===");
print!("{}", r.task_toml_text);
if !r.task_toml_text.ends_with('\n') {
println!();
}
println!("=== composed prompt ===");
println!("{}", r.composed_prompt);
println!("=== integrity ===");
println!("dna.body_hash = {}", r.dna.body_hash);
println!("recomputed body_hash = {}", r.recomputed_body_hash);
println!(
"match = {}",
if r.body_hash_matches { "yes" } else { "NO (drift)" }
);
}
fn run_diff(left: String, right: String) -> ExitCode {
match diff::diff(&left, &right) {
Ok(d) => {
println!("{}", d.render());
ExitCode::SUCCESS
}
Err(e) => {
eprintln!("diff failed: {e}");
ExitCode::from(1)
}
}
}

View file

@ -0,0 +1,96 @@
//! Replay — reconstruct a spawn's composed prompt from a DNA string.
//!
//! Pipeline:
//! 1. Parse DNA (validates shape).
//! 2. Resolve ledger hit (agent-id, worktree path, spec_sha).
//! 3. Locate `task.toml` (explicit override OR `<worktree>/tasks/<agent-id>/task.toml`).
//! 4. Load task + kit root, re-run `kei_agent_runtime::compose::compose_prompt`.
//! 5. Recompute body hash from the re-loaded `task.body.text` and compare
//! to the DNA body segment — mismatch = schema drift.
use anyhow::{anyhow, Context, Result};
use kei_agent_runtime::compose::compose_prompt;
use kei_agent_runtime::dna::Dna;
use kei_agent_runtime::spawn::load_task;
use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf};
/// Outcome of a replay pass.
#[derive(Debug, Clone)]
pub struct Replay {
pub dna: Dna,
pub task_toml_text: String,
pub composed_prompt: String,
pub recomputed_body_hash: String,
pub body_hash_matches: bool,
}
/// Reconstruct the spawn.
///
/// `db_path` — ledger SQLite file.
/// `dna_str` — the DNA string supplied on the CLI.
/// `task_override` — explicit `task.toml` path if caller knows it (bypasses lookup).
/// `kit_root` — repo root that holds `_roles/` + `_capabilities/`.
pub fn replay(
db_path: &Path,
dna_str: &str,
task_override: Option<&Path>,
kit_root: &Path,
) -> Result<Replay> {
let dna = Dna::parse(dna_str).map_err(|e| anyhow!("invalid DNA: {e}"))?;
let task_path = resolve_task_path(db_path, &dna, dna_str, task_override)?;
let task_toml_text = std::fs::read_to_string(&task_path)
.with_context(|| format!("read task {}", task_path.display()))?;
let task = load_task(&task_path)?;
let composed_prompt = compose_prompt(&task, kit_root)
.with_context(|| format!("re-compose prompt for {}", dna_str))?;
let recomputed_body_hash = short_sha256(&task.body.text);
let body_hash_matches = recomputed_body_hash.eq_ignore_ascii_case(&dna.body_hash);
Ok(Replay {
dna,
task_toml_text,
composed_prompt,
recomputed_body_hash,
body_hash_matches,
})
}
/// Prefer explicit override; else derive from ledger worktree_path + agent-id.
fn resolve_task_path(
db_path: &Path,
_dna: &Dna,
dna_str: &str,
task_override: Option<&Path>,
) -> Result<PathBuf> {
if let Some(p) = task_override {
return Ok(p.to_path_buf());
}
let hit = crate::ledger_lookup::require_by_dna(db_path, dna_str)?;
let wt = hit.worktree_path.ok_or_else(|| {
anyhow!(
"DNA body hash not resolvable: ledger row `{}` has no worktree_path — \
task.toml required (pass --task <path>)",
hit.id
)
})?;
let path = PathBuf::from(wt).join("tasks").join(&hit.id).join("task.toml");
if !path.is_file() {
return Err(anyhow!(
"DNA body hash not resolvable: task.toml not found at {} — \
pass --task <path> to override",
path.display()
));
}
Ok(path)
}
/// 8-hex SHA-256 prefix — mirrors `kei_agent_runtime::dna::short_sha256`.
/// Kept local (that fn is private) so we stay drop-in compatible with the
/// DNA body_hash format without exposing a new API surface.
fn short_sha256(input: &str) -> String {
let digest = Sha256::digest(input.as_bytes());
format!(
"{:02X}{:02X}{:02X}{:02X}",
digest[0], digest[1], digest[2], digest[3]
)
}

View file

@ -0,0 +1,229 @@
//! Smoke tests for kei-replay.
//!
//! Covers: happy-path replay, missing DNA, drift detection, diff.
//!
//! Each test builds its own isolated tempdir with:
//! - SQLite ledger seeded with the relevant `agents` row
//! - `<worktree>/tasks/<agent-id>/task.toml`
//! - `<kit_root>/_roles/<role>.toml`
//! - `<kit_root>/_capabilities/<cat>/<slug>/text.md`
//! Then calls `replay::replay` / `diff::diff` directly (skips the CLI layer).
use kei_agent_runtime::capability::TaskSpec;
use kei_agent_runtime::dna::Dna;
use kei_agent_runtime::role::ResolvedRole;
use kei_replay::{diff, replay};
use rusqlite::{params, Connection};
use std::path::{Path, PathBuf};
use tempfile::TempDir;
struct Fixture {
_tmp: TempDir,
db: PathBuf,
kit_root: PathBuf,
agent_id: String,
dna_str: String,
body: String,
}
fn resolved_fake() -> ResolvedRole {
ResolvedRole {
required: vec!["policy::no-git-ops".into(), "output::report-format".into()],
warnings: Vec::new(),
}
}
fn write_kit(kit: &Path) {
std::fs::create_dir_all(kit.join("_capabilities/policy/no-git-ops")).unwrap();
std::fs::write(
kit.join("_capabilities/policy/no-git-ops/text.md"),
"## No git\n\nYou must not git.\n",
)
.unwrap();
std::fs::create_dir_all(kit.join("_capabilities/output/report-format")).unwrap();
std::fs::write(
kit.join("_capabilities/output/report-format/text.md"),
"## Report\n\nEmit a report.\n",
)
.unwrap();
std::fs::create_dir_all(kit.join("_roles")).unwrap();
std::fs::write(
kit.join("_roles/fake.toml"),
r#"
[role]
name = "fake"
[capabilities]
required = ["policy::no-git-ops", "output::report-format"]
"#,
)
.unwrap();
}
fn seed_ledger(db: &Path, agent_id: &str, dna: &str, worktree: &str) {
let conn = Connection::open(db).unwrap();
// Minimal schema for what kei-replay reads. Mirrors kei-ledger v4 cols.
conn.execute_batch(
"CREATE TABLE agents (
id TEXT PRIMARY KEY,
branch TEXT NOT NULL,
parent_branch TEXT,
spec_sha TEXT NOT NULL,
status TEXT NOT NULL,
started_ts INTEGER NOT NULL,
finished_ts INTEGER,
summary TEXT,
worktree_path TEXT,
dna TEXT,
creator_id TEXT,
fork_parent_id TEXT
);",
)
.unwrap();
conn.execute(
"INSERT INTO agents (id, branch, spec_sha, status, started_ts, worktree_path, dna)
VALUES (?1, 'agent/test', 'abc', 'running', 0, ?2, ?3)",
params![agent_id, worktree, dna],
)
.unwrap();
}
fn write_task_toml(worktree: &Path, agent_id: &str, body: &str) -> PathBuf {
let dir = worktree.join("tasks").join(agent_id);
std::fs::create_dir_all(&dir).unwrap();
let p = dir.join("task.toml");
// Explicit TOML literal: body.text uses triple-quoted block to survive
// any special chars in body strings across tests.
let toml = format!(
r#"[task]
role = "fake"
agent-id = "{agent_id}"
[body]
text = """{body}"""
"#
);
std::fs::write(&p, toml).unwrap();
p
}
fn build_dna(task: &TaskSpec) -> String {
Dna::compose(task, &resolved_fake()).render()
}
fn make_fixture(body: &str) -> Fixture {
let tmp = TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let kit_root = root.join("kit");
let worktree = root.join("worktree");
std::fs::create_dir_all(&worktree).unwrap();
write_kit(&kit_root);
let agent_id = "test-agent-01".to_string();
let mut task = TaskSpec::default();
task.task.role = "fake".into();
task.task.agent_id = agent_id.clone();
task.body.text = body.to_string();
write_task_toml(&worktree, &agent_id, body);
let dna_str = build_dna(&task);
let db = root.join("ledger.sqlite");
seed_ledger(&db, &agent_id, &dna_str, worktree.to_str().unwrap());
Fixture { _tmp: tmp, db, kit_root, agent_id, dna_str, body: body.to_string() }
}
#[test]
fn replay_happy_path_reconstructs_prompt_and_matches_body_hash() {
let f = make_fixture("Port the kei-forge templating subsystem.");
let r = replay::replay(&f.db, &f.dna_str, None, &f.kit_root).expect("replay");
assert!(r.body_hash_matches, "body hash must match at the happy path");
assert!(r.composed_prompt.contains("You must not git"));
assert!(r.composed_prompt.contains("Emit a report"));
assert!(r.composed_prompt.contains(&f.body));
assert_eq!(r.dna.role, "fake");
// task.toml text is echoed back verbatim
assert!(r.task_toml_text.contains(&f.agent_id));
}
#[test]
fn replay_unknown_dna_errors_with_not_found_message() {
let f = make_fixture("body");
// Use well-formed but unseeded DNA — parse passes, lookup must fail.
let bogus = "fake::NG-RF::DEADBEEF::BAADF00D-12345678";
let err = replay::replay(&f.db, bogus, None, &f.kit_root).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("not found"), "expected 'not found', got: {msg}");
}
#[test]
fn replay_detects_body_drift_when_task_toml_mutated_after_spawn() {
let f = make_fixture("original body");
// Mutate the on-disk task.toml to simulate schema drift since spawn.
let task_path = PathBuf::from(&f.db)
.parent()
.unwrap()
.join("worktree/tasks")
.join(&f.agent_id)
.join("task.toml");
let mutated = format!(
r#"[task]
role = "fake"
agent-id = "{}"
[body]
text = """MUTATED body — drift injected"""
"#,
f.agent_id
);
std::fs::write(&task_path, mutated).unwrap();
let r = replay::replay(&f.db, &f.dna_str, None, &f.kit_root).expect("replay");
assert!(
!r.body_hash_matches,
"drift must be detected: dna={} recomputed={}",
r.dna.body_hash, r.recomputed_body_hash
);
assert_ne!(r.dna.body_hash, r.recomputed_body_hash);
}
#[test]
fn diff_two_dnas_flags_every_changed_facet() {
let f1 = make_fixture("body-one");
let f2 = make_fixture("body-two");
let d = diff::diff(&f1.dna_str, &f2.dna_str).expect("diff");
// Same role + same caps + same (empty) scope + different body => different body_hash
assert!(!d.role_changed);
assert!(!d.caps_changed);
assert!(!d.scope_changed);
assert!(d.body_changed, "body must differ between the two fixtures");
// Different invocations => nonces almost always differ.
assert!(d.nonce_changed || !d.is_identical());
assert!(!d.is_same_composition(), "body differs => composition differs");
let rendered = d.render();
assert!(rendered.contains("CHANGED"));
}
#[test]
fn diff_identical_dna_strings_report_as_identical() {
let f = make_fixture("same");
let d = diff::diff(&f.dna_str, &f.dna_str).expect("diff");
assert!(d.is_identical());
assert!(d.is_same_composition());
assert!(!d.body_changed);
assert!(!d.nonce_changed);
}
#[test]
fn replay_honours_explicit_task_override_bypassing_ledger_worktree() {
// Fixture's task.toml lives at worktree/tasks/<id>/task.toml; copy it to
// a side path and pass --task override. Ledger row is still required for
// the DNA lookup (to assert it was spawned), but file path is overridden.
let f = make_fixture("override test");
let side = f.kit_root.parent().unwrap().join("side-task.toml");
let orig = f.kit_root.parent().unwrap().join("worktree/tasks").join(&f.agent_id).join("task.toml");
std::fs::copy(&orig, &side).unwrap();
let r = replay::replay(&f.db, &f.dna_str, Some(&side), &f.kit_root).expect("replay");
assert!(r.body_hash_matches);
}