Wave A — Functional ingest fix (root cause of empty Sleep reports):
- Rewrote TraceLine struct to match real Claude Code trace JSONL:
type (was kind), timestamp ISO8601 (was epoch ts), message Object,
cwd / gitBranch / parentUuid / uuid / subtype / toolUseID / toolUseResult
- New src/extract.rs: extract_tool_uses + extract_tool_result walks
message.content[] for nested tool_use / tool_result blocks
- New src/classifier.rs: explicit table classifier (tool_error, user_correction,
retry_loop, permission_denied, tool_use:<name>, ...) replaces shallow heuristic
- New src/error.rs: KeiMemoryError enum (IO/Parse/Db) replaces semantic
mismatch where IO error was wrapped as rusqlite::InvalidParameterName
- New src/trace_line.rs: TraceLine + helpers (cube extraction)
- Schema migration v3: events.cwd column + 3 hot-query indices
(events.tool, events.file_path, events.ts) + UNIQUE on patterns
- New tests/ingest_real_trace.rs: synth-fixture asserts tool/file/cwd/class extraction
Wave B — Lib crate split:
- Cargo.toml: [lib] target added alongside existing [[bin]]
- src/lib.rs: pub re-export of all 18 modules
- src/main.rs: 11 mod declarations replaced by single use kei_memory::{…}
- tests/integration.rs: #[path] hack replaced by use kei_memory::{…}
Wave C — TF-IDF dedup + single-JOIN + filter_map fix:
- Schema migration v2: tokens.idf_dirty column + flag-based dedup
- index_document no longer triggers per-call recompute_idf rebuild
- top_similar uses single JOIN via vectors_for_overlapping_sessions helper
(was N round-trips, one session_vector per candidate)
- All filter_map(|r| r.ok()) row-error swallowing replaced with ? propagation
- New tests/tfidf_idf_dedup.rs: 4 tests covering dedup behaviour, IDF emptiness,
JOIN-pruning, empty-query safety
Wave D — Commands split + nits:
- New src/dump.rs (43 LOC) + src/stats.rs (33 LOC):
CLI renderers extracted from commands.rs (was inline SQL + format)
- src/commands.rs: thin wrappers, -42 LOC
- src/injection_guard.rs: inline tests removed (-26 LOC), file under 200 LOC threshold
- tests/injection_guard_unit.rs (new): 4 tests in proper integration crate
- src/patterns.rs: INSERT replaced with INSERT...ON CONFLICT...DO UPDATE
(idempotent re-ingest, uses Wave A's UNIQUE index)
- src/analyze.rs + src/coaccess.rs: filter_map row-error fixes
- src/coaccess.rs: misleading PK comment rewritten
Verify-before-commit (RULE 0.13 §"Verify-before-commit"):
- cargo check --all-targets: PASS (1 unrelated dead-code warning)
- cargo test: 42 passed, 0 failed across 9 test binaries
- STATUS-TRUTH markers aggregated at .claude/agents/_merge/kei-memory-2026-05-01/
Architect-spotted ARCH-MAJOR + ARCH-MINOR + ARCH-NIT findings addressed:
- ARCH-MAJOR Cargo.toml binary-only (Wave B)
- ARCH-MAJOR schema missing indices (Wave A v3)
- ARCH-MAJOR ingest_jsonl choke point (Wave A — extract.rs + classifier.rs)
- ARCH-MAJOR idf O(N·V) per-call rebuild (Wave C)
- ARCH-MINOR patterns no UPSERT (Wave D)
- ARCH-MINOR commands.rs houses dump+stats (Wave D)
- ARCH-MINOR classifier silent contract (Wave A)
- ARCH-MINOR IO error wrapped as rusqlite (Wave A)
- ARCH-MINOR injection_guard inline tests (Wave D)
- ARCH-MINOR tfidf top_similar N round-trips (Wave C)
- ARCH-NIT 3× filter_map(|r| r.ok()) sites (Wave C + D)
- ARCH-NIT coaccess misleading comment (Wave D)
=== STATUS-TRUTH MARKER ===
shipped: functional
stubs: 0
cargo-check: PASS
cargo-test: PASS (42 tests, 0 failures)
behaviour-verified: yes
follow-up-required:
- tests/ingest_guard_tests.rs + tests/guard_test_corpus.rs still on #[path] hack (Wave B follow-up note, ~5 LOC)
- dead_code warning Severity::Warn unused (pre-existing, not blocking)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
175 lines
6.4 KiB
Rust
175 lines
6.4 KiB
Rust
//! Ingest — read JSONL trace → insert events into DB.
|
|
//!
|
|
//! Constructor Pattern: one cube, single responsibility.
|
|
//! Trace-line shape lives in `trace_line.rs`; classification in
|
|
//! `classifier.rs`; tool_use/tool_result extraction in `extract.rs`.
|
|
//! This file owns the persistence + IO loop.
|
|
//!
|
|
//! Schema-mismatch fix: Wave A (2026-05-01). Pre-fix, ~50% of real
|
|
//! traces silently dropped via `Err(_) => continue` — root cause was
|
|
//! the old struct mapping `kind` to top-level `kind` field, which the
|
|
//! real format calls `type`, plus tool calls being nested objects.
|
|
|
|
pub use crate::trace_line::TraceLine;
|
|
|
|
use crate::classifier::classify;
|
|
use crate::coaccess::record_coaccess;
|
|
use crate::error::{KeiMemoryError, Result as KmResult};
|
|
use crate::extract::{extract_tool_result, extract_tool_uses, ToolUse};
|
|
use crate::injection_guard;
|
|
use chrono::Utc;
|
|
use rusqlite::{params, Connection, Result};
|
|
use std::fs::File;
|
|
use std::io::{BufRead, BufReader};
|
|
use std::path::Path;
|
|
|
|
/// Ensure the sessions row exists (idempotent). Returns started_ts.
|
|
pub fn ensure_session(conn: &Connection, session_id: &str) -> Result<i64> {
|
|
let now = Utc::now().timestamp();
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO sessions (id, started_ts) VALUES (?1, ?2)",
|
|
params![session_id, now],
|
|
)?;
|
|
let started: i64 = conn.query_row(
|
|
"SELECT started_ts FROM sessions WHERE id = ?1",
|
|
params![session_id],
|
|
|r| r.get(0),
|
|
)?;
|
|
Ok(started)
|
|
}
|
|
|
|
/// Read a JSONL transcript line by line and insert events.
|
|
///
|
|
/// Returns total event-row count inserted (one assistant line with N
|
|
/// tool_uses → N rows). Malformed JSON yields a stderr log line but
|
|
/// does not abort the file. Schema and IO errors propagate.
|
|
pub fn ingest_jsonl(conn: &Connection, session_id: &str, path: &Path) -> KmResult<usize> {
|
|
ensure_session(conn, session_id)?;
|
|
let file = File::open(path).map_err(KeiMemoryError::Io)?;
|
|
let mut inserted = 0usize;
|
|
for line in BufReader::new(file).lines().map_while(|l| l.ok()) {
|
|
if let Some(parsed) = parse_one_line(&line) {
|
|
inserted += process_line(conn, session_id, &parsed)?;
|
|
}
|
|
}
|
|
finalize_session(conn, session_id)?;
|
|
Ok(inserted)
|
|
}
|
|
|
|
/// Parse one JSONL line into a TraceLine, surfacing errors to stderr.
|
|
/// Returns None for blank / non-object / unparseable lines.
|
|
fn parse_one_line(line: &str) -> Option<TraceLine> {
|
|
let trimmed = line.trim();
|
|
if trimmed.is_empty() || !trimmed.starts_with('{') {
|
|
return None;
|
|
}
|
|
match serde_json::from_str::<TraceLine>(trimmed) {
|
|
Ok(p) => Some(p),
|
|
Err(e) => {
|
|
eprintln!("kei-memory: parse skip ({} chars): {e}", trimmed.len());
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Persist all event rows derivable from one parsed trace line.
|
|
///
|
|
/// Strategy (simpler model — no tool_use ↔ tool_result pairing):
|
|
/// * If message has nested `tool_use` blocks: emit one row per block
|
|
/// with `tool=name, file_path=input.file_path, is_error=false`.
|
|
/// * If message has a `tool_result` block: emit one row with
|
|
/// `is_error=<from JSON>` and the legacy `tool` if present.
|
|
/// * Otherwise: emit a single row driven by kind + legacy fields.
|
|
fn process_line(conn: &Connection, session_id: &str, e: &TraceLine) -> Result<usize> {
|
|
let tool_uses: Vec<ToolUse> = e.message.as_ref().map(extract_tool_uses).unwrap_or_default();
|
|
if !tool_uses.is_empty() {
|
|
for u in &tool_uses {
|
|
let fp = u.file_path.clone().or_else(|| e.file_path.clone());
|
|
insert_one(conn, session_id, e, Some(&u.name), fp.as_deref(), false)?;
|
|
}
|
|
return Ok(tool_uses.len());
|
|
}
|
|
let is_err = e
|
|
.message
|
|
.as_ref()
|
|
.and_then(extract_tool_result)
|
|
.map(|r| r.is_error)
|
|
.or(e.is_error)
|
|
.unwrap_or(false);
|
|
insert_one(conn, session_id, e, e.tool.as_deref(), e.file_path.as_deref(), is_err)?;
|
|
Ok(1)
|
|
}
|
|
|
|
/// Insert a single event row directly (legacy entrypoint kept for tests).
|
|
///
|
|
/// P2.1.b — guards `message_text()` via `injection_guard::scan` BEFORE
|
|
/// persistence. A Block-tier hit logs to stderr and skips the row
|
|
/// (returns Ok so the surrounding ingest loop continues). This is a
|
|
/// real memory-write path: the message later flows into the system
|
|
/// prompt verbatim.
|
|
pub fn insert_event(conn: &Connection, session_id: &str, e: &TraceLine) -> Result<()> {
|
|
insert_one(
|
|
conn,
|
|
session_id,
|
|
e,
|
|
e.tool.as_deref(),
|
|
e.file_path.as_deref(),
|
|
e.is_error.unwrap_or(false),
|
|
)
|
|
}
|
|
|
|
/// Single insert path used by `process_line` AND `insert_event`.
|
|
/// Applies guard, classifier, persists row, records co-access.
|
|
fn insert_one(
|
|
conn: &Connection,
|
|
session_id: &str,
|
|
e: &TraceLine,
|
|
tool: Option<&str>,
|
|
file_path: Option<&str>,
|
|
is_err: bool,
|
|
) -> Result<()> {
|
|
let msg_text = e.message_text();
|
|
if message_is_blocked(session_id, msg_text.as_deref()) {
|
|
return Ok(());
|
|
}
|
|
let ts = e.resolved_ts();
|
|
let kind = e.kind.as_deref().unwrap_or("other");
|
|
let class = e
|
|
.event_class
|
|
.clone()
|
|
.unwrap_or_else(|| classify(Some(kind), tool, msg_text.as_deref(), is_err));
|
|
conn.execute(
|
|
"INSERT INTO events (session_id, ts, kind, tool, file_path, is_error, event_class, message, cwd)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
|
|
params![session_id, ts, kind, tool, file_path, is_err as i64, class, msg_text, e.cwd],
|
|
)?;
|
|
if let Some(fp) = file_path {
|
|
record_coaccess(conn, session_id, fp, ts)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns true (and logs) when `message` carries a Block-tier finding.
|
|
fn message_is_blocked(session_id: &str, message: Option<&str>) -> bool {
|
|
if let Some(msg) = message {
|
|
if let Err(finding) = injection_guard::scan(msg) {
|
|
eprintln!("kei-memory: insert_event rejected (session={session_id}): {finding}");
|
|
return true;
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
/// Update aggregate counters on the sessions row.
|
|
pub fn finalize_session(conn: &Connection, session_id: &str) -> Result<()> {
|
|
let now = Utc::now().timestamp();
|
|
conn.execute(
|
|
"UPDATE sessions SET
|
|
ended_ts = ?1,
|
|
tool_call_count = (SELECT COUNT(*) FROM events WHERE session_id = ?2),
|
|
error_count = (SELECT COALESCE(SUM(is_error),0) FROM events WHERE session_id = ?2)
|
|
WHERE id = ?2",
|
|
params![now, session_id],
|
|
)?;
|
|
Ok(())
|
|
}
|