feat(primitives): kei-memory Rust crate — offline session analyzer (Genesis-clean)

This commit is contained in:
Parfii-bot 2026-04-22 00:50:04 +08:00
parent 75bceaf3a6
commit 040e189b80
14 changed files with 1250 additions and 0 deletions

View file

@ -919,6 +919,19 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-memory"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"regex",
"rusqlite",
"serde",
"serde_json",
"tempfile",
]
[[package]]
name = "kei-migrate"
version = "0.1.0"

View file

@ -9,6 +9,7 @@ members = [
"mock-render",
"visual-diff",
"tokens-sync",
"kei-memory",
]
[workspace.package]

View file

@ -0,0 +1,21 @@
[package]
name = "kei-memory"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Session retrospective + recurring pattern detector (offline-first, RULE 0.14)"
[[bin]]
name = "kei-memory"
path = "src/main.rs"
[dependencies]
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
regex = "1"
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,122 @@
//! Session retrospective — duration, tool counts, files, errors, time-wasters.
//!
//! Constructor Pattern: one cube, one read-only responsibility.
//! Output is plain-text (stdout). Callers can `--summary` for a one-liner
//! suitable for appending to audit-backlog.md, or full report for review.
use rusqlite::{params, Connection, OptionalExtension, Result};
/// Minimal session-header info returned as tuple for downstream formatters.
pub struct SessionHeader {
pub id: String,
pub started_ts: i64,
pub ended_ts: Option<i64>,
pub tool_call_count: i64,
pub error_count: i64,
}
/// Load the `sessions` row for an id.
pub fn session_header(conn: &Connection, id: &str) -> Result<Option<SessionHeader>> {
conn.query_row(
"SELECT id, started_ts, ended_ts, tool_call_count, error_count
FROM sessions WHERE id = ?1",
params![id],
|r| {
Ok(SessionHeader {
id: r.get(0)?,
started_ts: r.get(1)?,
ended_ts: r.get(2)?,
tool_call_count: r.get(3)?,
error_count: r.get(4)?,
})
},
)
.optional()
}
/// Return the last `n` session ids (most recent first).
pub fn recent_session_ids(conn: &Connection, n: usize) -> Result<Vec<String>> {
let mut stmt = conn.prepare(
"SELECT id FROM sessions ORDER BY COALESCE(ended_ts, started_ts) DESC LIMIT ?1",
)?;
let rows = stmt
.query_map(params![n as i64], |r| r.get::<_, String>(0))?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
/// Return (tool, count) pairs ordered by invocation count DESC.
pub fn top_tools(conn: &Connection, session_id: &str, limit: usize) -> Result<Vec<(String, i64)>> {
let mut stmt = conn.prepare(
"SELECT tool, COUNT(*) FROM events
WHERE session_id = ?1 AND tool IS NOT NULL
GROUP BY tool ORDER BY COUNT(*) DESC LIMIT ?2",
)?;
let rows = stmt
.query_map(params![session_id, limit as i64], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
})?
.collect::<Result<Vec<_>>>()?;
Ok(rows)
}
/// Return (file_path, count) for the most-touched files in a session.
pub fn top_files(conn: &Connection, session_id: &str, limit: usize) -> Result<Vec<(String, i64)>> {
let mut stmt = conn.prepare(
"SELECT file_path, COUNT(*) FROM events
WHERE session_id = ?1 AND file_path IS NOT NULL
GROUP BY file_path ORDER BY COUNT(*) DESC LIMIT ?2",
)?;
let rows = stmt
.query_map(params![session_id, limit as i64], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
})?
.collect::<Result<Vec<_>>>()?;
Ok(rows)
}
/// Render a full retrospective for one session to stdout.
pub fn render_report(conn: &Connection, session_id: &str, summary_only: bool) -> Result<String> {
let hdr = match session_header(conn, session_id)? {
Some(h) => h,
None => return Ok(format!("(no session with id {session_id})\n")),
};
let duration = hdr.ended_ts.unwrap_or(hdr.started_ts) - hdr.started_ts;
if summary_only {
return Ok(format!(
"session={} dur={}s tools={} errors={}\n",
hdr.id, duration, hdr.tool_call_count, hdr.error_count
));
}
let mut out = String::new();
out.push_str(&format!("=== SESSION {} ===\n", hdr.id));
out.push_str(&format!("Duration: {}s\n", duration));
out.push_str(&format!("Tool calls: {}\n", hdr.tool_call_count));
out.push_str(&format!("Errors: {}\n", hdr.error_count));
out.push_str("\nTop tools:\n");
for (t, c) in top_tools(conn, session_id, 5)? {
out.push_str(&format!(" {c:>4} {t}\n"));
}
out.push_str("\nTop files:\n");
for (f, c) in top_files(conn, session_id, 10)? {
out.push_str(&format!(" {c:>4} {f}\n"));
}
Ok(out)
}
/// Aggregate analyze across recent N sessions — concat render_report each.
pub fn render_recent(conn: &Connection, n: usize, summary_only: bool) -> Result<String> {
let ids = recent_session_ids(conn, n)?;
if ids.is_empty() {
return Ok("(no sessions ingested yet)\n".into());
}
let mut out = String::new();
for id in ids {
out.push_str(&render_report(conn, &id, summary_only)?);
if !summary_only {
out.push('\n');
}
}
Ok(out)
}

View file

@ -0,0 +1,61 @@
//! Backlog — silent-first audit item CRUD.
//!
//! Constructor Pattern: one cube, one CLI subcommand's worth of logic.
use chrono::Utc;
use rusqlite::{params, Connection};
use std::process::ExitCode;
fn err(msg: &str) -> ExitCode {
eprintln!("kei-memory: {msg}");
ExitCode::from(1)
}
pub fn cmd_backlog(
conn: &Connection,
add: Option<String>,
list: bool,
clear: bool,
) -> ExitCode {
let now = Utc::now().timestamp();
if let Some(item) = add {
if let Err(e) = conn.execute(
"INSERT INTO backlog (ts, item) VALUES (?1, ?2)",
params![now, item],
) {
return err(&format!("backlog add failed: {e}"));
}
println!("added");
return ExitCode::SUCCESS;
}
if clear {
let _ = conn.execute("UPDATE backlog SET processed = 1", []);
println!("cleared");
return ExitCode::SUCCESS;
}
if list {
return list_open(conn);
}
err("backlog: pass --add=<item> | --list | --clear")
}
fn list_open(conn: &Connection) -> ExitCode {
let mut stmt = match conn
.prepare("SELECT ts, item FROM backlog WHERE processed = 0 ORDER BY ts ASC")
{
Ok(s) => s,
Err(e) => return err(&format!("backlog list prep: {e}")),
};
let rows = match stmt.query_map([], |r| {
Ok((r.get::<_, i64>(0)?, r.get::<_, String>(1)?))
}) {
Ok(r) => r,
Err(e) => return err(&format!("backlog list query: {e}")),
};
for row in rows {
if let Ok((ts, it)) = row {
println!("{ts}\t{it}");
}
}
ExitCode::SUCCESS
}

View file

@ -0,0 +1,79 @@
//! Co-access tracking — files touched within a 5-minute window.
//!
//! Constructor Pattern: one cube, single responsibility.
//! Derived from KeiMD/src/ml.rs (2026-04-22 verified Genesis-clean).
//! Key difference: session-id isn't part of the coaccess PK — we aggregate
//! across sessions so cross-session recurrences surface in `patterns`.
use rusqlite::{params, Connection, Result};
const WINDOW_SECS: i64 = 300;
/// Insert (or increment) pair entries for the new file vs any other file
/// touched in the same session within the last 5 minutes. Pair ordering
/// is canonicalised lexically so (A,B) and (B,A) collapse to one row.
pub fn record_coaccess(
conn: &Connection,
session_id: &str,
file_path: &str,
ts: i64,
) -> Result<()> {
let recent = recent_files_in_window(conn, session_id, file_path, ts)?;
for other in recent {
let (a, b) = canonical_pair(file_path, &other);
conn.execute(
"INSERT INTO coaccess (file_a, file_b, count) VALUES (?1, ?2, 1)
ON CONFLICT(file_a, file_b) DO UPDATE SET count = count + 1",
params![a, b],
)?;
}
Ok(())
}
fn canonical_pair<'a>(x: &'a str, y: &'a str) -> (&'a str, &'a str) {
if x < y {
(x, y)
} else {
(y, x)
}
}
fn recent_files_in_window(
conn: &Connection,
session_id: &str,
exclude: &str,
ts: i64,
) -> Result<Vec<String>> {
let mut stmt = conn.prepare(
"SELECT DISTINCT file_path FROM events
WHERE session_id = ?1
AND file_path IS NOT NULL
AND file_path != ?2
AND ts >= ?3
ORDER BY ts DESC LIMIT 10",
)?;
let rows = stmt
.query_map(params![session_id, exclude, ts - WINDOW_SECS], |r| {
r.get::<_, String>(0)
})?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
/// Return (file_a, file_b, count) triples ordered by co-access count DESC.
/// Not yet exposed on the CLI — used by integration tests and reserved
/// for the upcoming `kei-memory coaccess --top` subcommand.
#[allow(dead_code)]
pub fn top_pairs(conn: &Connection, limit: usize) -> Result<Vec<(String, String, i64)>> {
let mut stmt = conn.prepare(
"SELECT file_a, file_b, count FROM coaccess
ORDER BY count DESC LIMIT ?1",
)?;
let rows = stmt
.query_map(params![limit as i64], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, i64>(2)?))
})?
.collect::<Result<Vec<_>>>()?;
Ok(rows)
}

View file

@ -0,0 +1,162 @@
//! Command handlers — one function per CLI subcommand.
//!
//! Constructor Pattern: each handler <30 LOC, single responsibility.
//! Pulled out of main.rs to keep the dispatcher under the 200 LOC limit.
use crate::{analyze, ingest, patterns, tfidf};
use rusqlite::Connection;
use std::path::PathBuf;
use std::process::ExitCode;
fn err(msg: &str) -> ExitCode {
eprintln!("kei-memory: {msg}");
ExitCode::from(1)
}
pub fn cmd_ingest(
conn: &Connection,
session_id: &str,
transcript: &PathBuf,
prompt: Option<String>,
) -> ExitCode {
match ingest::ingest_jsonl(conn, session_id, transcript) {
Ok(n) => {
if let Some(p) = prompt {
let _ = tfidf::index_document(conn, session_id, &p);
}
let _ = patterns::detect_in_session(conn, session_id);
println!("ingested {n} events into session {session_id}");
ExitCode::SUCCESS
}
Err(e) => err(&format!("ingest failed: {e}")),
}
}
pub fn cmd_analyze(
conn: &Connection,
session: Option<String>,
last: usize,
summary: bool,
) -> ExitCode {
let out = match session {
Some(id) => analyze::render_report(conn, &id, summary),
None => analyze::render_recent(conn, last, summary),
};
match out {
Ok(s) => {
print!("{s}");
ExitCode::SUCCESS
}
Err(e) => err(&format!("analyze failed: {e}")),
}
}
pub fn cmd_patterns(
conn: &Connection,
cross_session: bool,
session: Option<String>,
) -> ExitCode {
let rows = if cross_session {
patterns::detect_cross_session(conn)
} else if let Some(id) = session {
patterns::detect_in_session(conn, &id)
} else {
patterns::list_all(conn, 50)
};
match rows {
Ok(list) => {
if list.is_empty() {
println!("(no patterns)");
}
for p in list {
println!(
"{:>4} {} session={}",
p.count,
p.event_class,
p.session_id.as_deref().unwrap_or("-")
);
}
ExitCode::SUCCESS
}
Err(e) => err(&format!("patterns failed: {e}")),
}
}
pub fn cmd_similar(conn: &Connection, prompt: &str, limit: usize) -> ExitCode {
match tfidf::top_similar(conn, prompt, limit) {
Ok(rows) => {
if rows.is_empty() {
println!("(no matches)");
}
for (sid, score) in rows {
println!("{:.4} {}", score, sid);
}
ExitCode::SUCCESS
}
Err(e) => err(&format!("similar failed: {e}")),
}
}
pub fn cmd_dump(conn: &Connection, session_id: &str) -> ExitCode {
match dump_events(conn, session_id) {
Ok(()) => ExitCode::SUCCESS,
Err(e) => err(&format!("dump failed: {e}")),
}
}
fn dump_events(conn: &Connection, session_id: &str) -> rusqlite::Result<()> {
let mut stmt = conn.prepare(
"SELECT ts, kind, tool, file_path, is_error, message
FROM events WHERE session_id = ?1 ORDER BY ts ASC",
)?;
println!("# session {session_id}\n");
let rows = stmt.query_map(rusqlite::params![session_id], |r| {
Ok((
r.get::<_, i64>(0)?,
r.get::<_, String>(1)?,
r.get::<_, Option<String>>(2)?,
r.get::<_, Option<String>>(3)?,
r.get::<_, i64>(4)?,
r.get::<_, Option<String>>(5)?,
))
})?;
for row in rows {
let (ts, kind, tool, file, is_err, msg) = row?;
println!(
"- `t={ts}` **{kind}** {} {} err={} {}",
tool.unwrap_or_default(),
file.unwrap_or_default(),
is_err,
msg.unwrap_or_default()
);
}
Ok(())
}
pub fn cmd_stats(conn: &Connection) -> ExitCode {
match print_stats(conn) {
Ok(()) => ExitCode::SUCCESS,
Err(e) => err(&format!("stats failed: {e}")),
}
}
fn print_stats(conn: &Connection) -> rusqlite::Result<()> {
let n_sess: i64 = conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
let n_evt: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0))?;
let n_pat: i64 = conn.query_row("SELECT COUNT(*) FROM patterns", [], |r| r.get(0))?;
println!("sessions: {n_sess}\nevents: {n_evt}\npatterns: {n_pat}");
let mut stmt = conn.prepare(
"SELECT tool, COUNT(*) FROM events WHERE tool IS NOT NULL
GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10",
)?;
println!("\nTop tools:");
let rows = stmt.query_map([], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
})?;
for row in rows {
let (t, c) = row?;
println!(" {c:>4} {t}");
}
Ok(())
}

View file

@ -0,0 +1,135 @@
//! Ingest — read JSONL trace → insert events into DB.
//!
//! Constructor Pattern: one cube, single responsibility.
//! Trace line shape (subset we care about):
//! {"ts": 1700000000, "kind": "tool_use", "tool": "Bash",
//! "file_path": "...", "is_error": false, "message": "..."}
//! Unknown/empty lines are skipped silently.
use crate::coaccess::record_coaccess;
use chrono::Utc;
use rusqlite::{params, Connection, Result};
use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
#[derive(Debug, Deserialize, Default)]
pub struct TraceLine {
#[serde(default)]
pub ts: Option<i64>,
#[serde(default)]
pub kind: Option<String>,
#[serde(default)]
pub tool: Option<String>,
#[serde(default)]
pub file_path: Option<String>,
#[serde(default)]
pub is_error: Option<bool>,
#[serde(default)]
pub event_class: Option<String>,
#[serde(default)]
pub message: Option<String>,
}
/// Ensure the sessions row exists (idempotent). Returns started_ts.
pub fn ensure_session(conn: &Connection, session_id: &str) -> Result<i64> {
let now = Utc::now().timestamp();
conn.execute(
"INSERT OR IGNORE INTO sessions (id, started_ts) VALUES (?1, ?2)",
params![session_id, now],
)?;
let started: i64 = conn.query_row(
"SELECT started_ts FROM sessions WHERE id = ?1",
params![session_id],
|r| r.get(0),
)?;
Ok(started)
}
/// Read a JSONL transcript line by line and insert one row per event.
/// Returns the number of events actually inserted (malformed lines skipped).
pub fn ingest_jsonl(conn: &Connection, session_id: &str, path: &Path) -> Result<usize> {
ensure_session(conn, session_id)?;
let file = File::open(path)
.map_err(|e| rusqlite::Error::InvalidParameterName(format!("open {}: {e}", path.display())))?;
let reader = BufReader::new(file);
let mut inserted = 0usize;
for line in reader.lines().map_while(|l| l.ok()) {
let trimmed = line.trim();
if trimmed.is_empty() || !trimmed.starts_with('{') {
continue;
}
let parsed: TraceLine = match serde_json::from_str(trimmed) {
Ok(p) => p,
Err(_) => continue,
};
insert_event(conn, session_id, &parsed)?;
inserted += 1;
}
finalize_session(conn, session_id)?;
Ok(inserted)
}
/// Insert a single event row. Updates co-access if file_path present.
pub fn insert_event(conn: &Connection, session_id: &str, e: &TraceLine) -> Result<()> {
let ts = e.ts.unwrap_or_else(|| Utc::now().timestamp());
let kind = e.kind.clone().unwrap_or_else(|| "other".to_string());
let is_err = e.is_error.unwrap_or(false) as i64;
let class = e
.event_class
.clone()
.unwrap_or_else(|| classify_default(&kind, e.tool.as_deref(), e.message.as_deref()));
conn.execute(
"INSERT INTO events (session_id, ts, kind, tool, file_path, is_error, event_class, message)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
session_id,
ts,
kind,
e.tool,
e.file_path,
is_err,
class,
e.message
],
)?;
if let Some(fp) = &e.file_path {
record_coaccess(conn, session_id, fp, ts)?;
}
Ok(())
}
/// Cheap heuristic classifier used when trace does not provide one.
fn classify_default(kind: &str, tool: Option<&str>, message: Option<&str>) -> String {
if let Some(m) = message {
let lm = m.to_lowercase();
if lm.contains("permission denied") || lm.contains("denied") {
return "permission_denied".to_string();
}
if lm.contains("worktree") && lm.contains("error") {
return "worktree_error".to_string();
}
if lm.contains("cargo") && lm.contains("workspace") {
return "cargo_workspace".to_string();
}
}
match (kind, tool) {
("tool_use", Some(t)) => format!("tool_use:{t}"),
_ => kind.to_string(),
}
}
/// Update aggregate counters on the sessions row.
pub fn finalize_session(conn: &Connection, session_id: &str) -> Result<()> {
let now = Utc::now().timestamp();
conn.execute(
"UPDATE sessions SET
ended_ts = ?1,
tool_call_count = (SELECT COUNT(*) FROM events WHERE session_id = ?2),
error_count = (SELECT COALESCE(SUM(is_error),0) FROM events WHERE session_id = ?2)
WHERE id = ?2",
params![now, session_id],
)?;
Ok(())
}

View file

@ -0,0 +1,125 @@
//! kei-memory — offline session analyzer + recurring-pattern detector.
//!
//! Constructor Pattern: main.rs only dispatches; work lives in cubes.
//! Storage: `~/.claude/memory/kei-memory.sqlite` (or $KEI_MEMORY_DB).
//! RULE 0.14 — session self-audit, silent-first until 10 sessions ingested.
mod analyze;
mod backlog;
mod coaccess;
mod commands;
mod ingest;
mod patterns;
mod schema;
mod similarity;
mod tfidf;
use clap::{Parser, Subcommand};
use rusqlite::Connection;
use std::path::PathBuf;
use std::process::ExitCode;
#[derive(Parser)]
#[command(name = "kei-memory", version, about = "Offline session retrospective (RULE 0.14)")]
struct Cli {
/// Override DB path (default: $KEI_MEMORY_DB or ~/.claude/memory/kei-memory.sqlite)
#[arg(long)]
db: Option<PathBuf>,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Read a JSONL transcript and insert session + events.
Ingest {
#[arg(long)]
session_id: String,
#[arg(long)]
transcript: PathBuf,
#[arg(long)]
prompt: Option<String>,
},
/// Print a retrospective for a session or the last N sessions.
Analyze {
#[arg(long)]
session: Option<String>,
#[arg(long, default_value_t = 1)]
last: usize,
#[arg(long)]
summary: bool,
},
/// List recurring event-class patterns.
Patterns {
#[arg(long)]
cross_session: bool,
#[arg(long)]
session: Option<String>,
},
/// Top-k past sessions by TF-IDF cosine similarity to the query text.
Similar {
prompt: String,
#[arg(long, default_value_t = 5)]
limit: usize,
},
/// Dump a session's events as markdown to stdout.
Dump { session_id: String },
/// N sessions, N events, top tools.
Stats,
/// Manage the silent-first audit backlog items.
Backlog {
#[arg(long)]
add: Option<String>,
#[arg(long)]
list: bool,
#[arg(long)]
clear: bool,
},
}
fn db_path(cli_db: Option<PathBuf>) -> PathBuf {
if let Some(p) = cli_db {
return p;
}
if let Ok(e) = std::env::var("KEI_MEMORY_DB") {
return PathBuf::from(e);
}
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
PathBuf::from(home).join(".claude/memory/kei-memory.sqlite")
}
fn open_db(path: &PathBuf) -> rusqlite::Result<Connection> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path)?;
schema::migrate(&conn)?;
Ok(conn)
}
fn main() -> ExitCode {
let cli = Cli::parse();
let path = db_path(cli.db);
let conn = match open_db(&path) {
Ok(c) => c,
Err(e) => {
eprintln!("kei-memory: open {}: {e}", path.display());
return ExitCode::from(1);
}
};
match cli.cmd {
Cmd::Ingest { session_id, transcript, prompt } => {
commands::cmd_ingest(&conn, &session_id, &transcript, prompt)
}
Cmd::Analyze { session, last, summary } => {
commands::cmd_analyze(&conn, session, last, summary)
}
Cmd::Patterns { cross_session, session } => {
commands::cmd_patterns(&conn, cross_session, session)
}
Cmd::Similar { prompt, limit } => commands::cmd_similar(&conn, &prompt, limit),
Cmd::Dump { session_id } => commands::cmd_dump(&conn, &session_id),
Cmd::Stats => commands::cmd_stats(&conn),
Cmd::Backlog { add, list, clear } => backlog::cmd_backlog(&conn, add, list, clear),
}
}

View file

@ -0,0 +1,95 @@
//! Pattern detector — recurring event-classes.
//!
//! Constructor Pattern: one cube, one read/write responsibility.
//! A "pattern" is an event_class that occurred ≥2 times in ONE session
//! (in-session recurrence) or ≥2 times across DIFFERENT sessions
//! (cross-session recurrence). Results are persisted into `patterns` and
//! also returned to the caller for display.
use rusqlite::{params, Connection, Result};
#[derive(Debug)]
#[allow(dead_code)]
pub struct PatternHit {
pub event_class: String,
pub session_id: Option<String>,
pub count: i64,
}
/// Detect in-session recurrences for `session_id`. Persists rows.
pub fn detect_in_session(conn: &Connection, session_id: &str) -> Result<Vec<PatternHit>> {
let mut stmt = conn.prepare(
"SELECT event_class, COUNT(*), MIN(ts), MAX(ts)
FROM events
WHERE session_id = ?1 AND event_class IS NOT NULL
GROUP BY event_class HAVING COUNT(*) >= 2
ORDER BY COUNT(*) DESC",
)?;
let rows = stmt
.query_map(params![session_id], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, i64>(1)?,
r.get::<_, i64>(2)?,
r.get::<_, i64>(3)?,
))
})?
.collect::<Result<Vec<_>>>()?;
let mut out = Vec::new();
for (class, count, first, last) in rows {
conn.execute(
"INSERT INTO patterns (event_class, session_id, count, first_seen_ts, last_seen_ts)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![class, session_id, count, first, last],
)?;
out.push(PatternHit {
event_class: class,
session_id: Some(session_id.to_string()),
count,
});
}
Ok(out)
}
/// Detect cross-session recurrences. Does NOT persist (history aggregate).
pub fn detect_cross_session(conn: &Connection) -> Result<Vec<PatternHit>> {
let mut stmt = conn.prepare(
"SELECT event_class, COUNT(DISTINCT session_id)
FROM events
WHERE event_class IS NOT NULL
GROUP BY event_class HAVING COUNT(DISTINCT session_id) >= 2
ORDER BY COUNT(DISTINCT session_id) DESC",
)?;
let rows = stmt
.query_map([], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
})?
.collect::<Result<Vec<_>>>()?;
Ok(rows
.into_iter()
.map(|(class, count)| PatternHit {
event_class: class,
session_id: None,
count,
})
.collect())
}
/// List all patterns in the persistent table (newest first).
#[allow(dead_code)]
pub fn list_all(conn: &Connection, limit: usize) -> Result<Vec<PatternHit>> {
let mut stmt = conn.prepare(
"SELECT event_class, session_id, count FROM patterns
ORDER BY last_seen_ts DESC LIMIT ?1",
)?;
let rows = stmt
.query_map(params![limit as i64], |r| {
Ok(PatternHit {
event_class: r.get::<_, String>(0)?,
session_id: Some(r.get::<_, String>(1)?),
count: r.get::<_, i64>(2)?,
})
})?
.collect::<Result<Vec<_>>>()?;
Ok(rows)
}

View file

@ -0,0 +1,80 @@
//! SQL schema for the kei-memory offline analyzer.
//!
//! Constructor Pattern: schema + migration runner, no business logic.
//! DB default path: `~/.claude/memory/kei-memory.sqlite`.
//! Any structural change MUST append a new migration; never edit history.
use rusqlite::{Connection, Result};
/// Ordered migrations. Index = schema version. Never reorder.
pub const MIGRATIONS: &[&str] = &[
// v1 — initial schema (RULE 0.14, 2026-04-22)
"CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
started_ts INTEGER NOT NULL,
ended_ts INTEGER,
tool_call_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
ts INTEGER NOT NULL,
kind TEXT NOT NULL,
tool TEXT,
file_path TEXT,
is_error INTEGER NOT NULL DEFAULT 0,
event_class TEXT,
message TEXT,
FOREIGN KEY(session_id) REFERENCES sessions(id)
);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
CREATE INDEX IF NOT EXISTS idx_events_class ON events(event_class);
CREATE TABLE IF NOT EXISTS coaccess (
file_a TEXT NOT NULL,
file_b TEXT NOT NULL,
count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY(file_a, file_b)
);
CREATE TABLE IF NOT EXISTS tokens (
session_id TEXT NOT NULL,
token TEXT NOT NULL,
tf INTEGER NOT NULL,
PRIMARY KEY(session_id, token)
);
CREATE TABLE IF NOT EXISTS idf (
token TEXT PRIMARY KEY,
df INTEGER NOT NULL,
idf REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS patterns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_class TEXT NOT NULL,
session_id TEXT NOT NULL,
count INTEGER NOT NULL,
first_seen_ts INTEGER NOT NULL,
last_seen_ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_patterns_class ON patterns(event_class);
CREATE TABLE IF NOT EXISTS backlog (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
item TEXT NOT NULL,
processed INTEGER NOT NULL DEFAULT 0
);",
];
/// Apply all pending migrations. Stores version in `PRAGMA user_version`.
pub fn migrate(conn: &Connection) -> Result<()> {
let current: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
for (i, sql) in MIGRATIONS.iter().enumerate() {
let target = (i + 1) as i64;
if current < target {
conn.execute_batch(sql)?;
conn.pragma_update(None, "user_version", target)?;
}
}
Ok(())
}

View file

@ -0,0 +1,28 @@
//! Cosine similarity over sparse term-weight maps.
//!
//! Constructor Pattern: one cube, one pure-math responsibility.
//! Classical numerator = Σ a·b over shared keys;
//! classical denominator = ‖a‖₂ · ‖b‖₂. No normalize-to-Frobenius, no rank
//! projection — just textbook cosine on HashMap<String, f64>.
use std::collections::HashMap;
/// Cosine similarity between two sparse vectors keyed by token.
pub fn cosine_tfidf(a: &HashMap<String, f64>, b: &HashMap<String, f64>) -> f64 {
if a.is_empty() || b.is_empty() {
return 0.0;
}
let mut dot = 0.0f64;
for (k, va) in a {
if let Some(vb) = b.get(k) {
dot += va * vb;
}
}
let norm_a: f64 = a.values().map(|v| v * v).sum::<f64>().sqrt();
let norm_b: f64 = b.values().map(|v| v * v).sum::<f64>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
0.0
} else {
dot / (norm_a * norm_b)
}
}

View file

@ -0,0 +1,143 @@
//! TF-IDF over session documents — fresh reimplementation.
//!
//! Constructor Pattern: one cube, one responsibility.
//! Genesis-clean: no normalize(S), no Frobenius, no rank-r decomposition.
//! Pure classical text-retrieval: tokens, term-frequency, inverse-doc-freq,
//! cosine similarity between (session_id, prompt) document vectors.
//!
//! Document identity = session_id. Corpus = all ingested sessions.
use crate::similarity::cosine_tfidf;
use regex::Regex;
use rusqlite::{params, Connection, Result};
use std::collections::HashMap;
/// Tokenise free text into lowercase alphanumeric word stems (≥3 chars).
pub fn tokenise(text: &str) -> Vec<String> {
let re = Regex::new(r"[A-Za-z][A-Za-z0-9_]{2,}").unwrap();
re.find_iter(text)
.map(|m| m.as_str().to_lowercase())
.collect()
}
/// Compute term-frequencies for a single document.
pub fn tf(tokens: &[String]) -> HashMap<String, i64> {
let mut h = HashMap::<String, i64>::new();
for t in tokens {
*h.entry(t.clone()).or_insert(0) += 1;
}
h
}
/// Record a document's tokens under `session_id`. Overwrites prior entry
/// for the same session (idempotent ingest).
pub fn index_document(conn: &Connection, session_id: &str, text: &str) -> Result<()> {
conn.execute("DELETE FROM tokens WHERE session_id = ?1", params![session_id])?;
let toks = tokenise(text);
let counts = tf(&toks);
for (tok, c) in &counts {
conn.execute(
"INSERT INTO tokens (session_id, token, tf) VALUES (?1, ?2, ?3)",
params![session_id, tok, c],
)?;
}
recompute_idf(conn)?;
Ok(())
}
/// Recompute the full IDF table. Called after each document ingest — cheap
/// for N < 10k sessions, and keeps the table in sync without an update trigger.
pub fn recompute_idf(conn: &Connection) -> Result<()> {
let n: i64 = conn
.query_row(
"SELECT COUNT(DISTINCT session_id) FROM tokens",
[],
|r| r.get(0),
)
.unwrap_or(0);
if n == 0 {
conn.execute("DELETE FROM idf", [])?;
return Ok(());
}
conn.execute("DELETE FROM idf", [])?;
let mut stmt = conn.prepare(
"SELECT token, COUNT(DISTINCT session_id) FROM tokens GROUP BY token",
)?;
let rows: Vec<(String, i64)> = stmt
.query_map([], |r| Ok((r.get(0)?, r.get(1)?)))?
.collect::<Result<Vec<_>>>()?;
for (tok, df) in rows {
let idf = ((n as f64 + 1.0) / (df as f64 + 1.0)).ln() + 1.0;
conn.execute(
"INSERT INTO idf (token, df, idf) VALUES (?1, ?2, ?3)",
params![tok, df, idf],
)?;
}
Ok(())
}
/// Fetch a session's (token → tf·idf) sparse vector.
pub fn session_vector(conn: &Connection, session_id: &str) -> Result<HashMap<String, f64>> {
let mut stmt = conn.prepare(
"SELECT t.token, t.tf, COALESCE(i.idf, 1.0)
FROM tokens t
LEFT JOIN idf i ON i.token = t.token
WHERE t.session_id = ?1",
)?;
let rows = stmt.query_map(params![session_id], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as f64, r.get::<_, f64>(2)?))
})?;
let mut v = HashMap::<String, f64>::new();
for row in rows {
let (tok, tf_v, idf_v) = row?;
v.insert(tok, tf_v * idf_v);
}
Ok(v)
}
/// Compute a TF·IDF vector for ad-hoc query text, using existing corpus IDF.
pub fn query_vector(conn: &Connection, text: &str) -> Result<HashMap<String, f64>> {
let toks = tokenise(text);
let counts = tf(&toks);
let mut v = HashMap::<String, f64>::new();
for (tok, c) in counts {
let idf: f64 = conn
.query_row(
"SELECT idf FROM idf WHERE token = ?1",
params![tok],
|r| r.get(0),
)
.unwrap_or(1.0);
v.insert(tok, c as f64 * idf);
}
Ok(v)
}
/// Return the top-k sessions by cosine similarity against `query`.
pub fn top_similar(
conn: &Connection,
query: &str,
limit: usize,
) -> Result<Vec<(String, f64)>> {
let q = query_vector(conn, query)?;
if q.is_empty() {
return Ok(vec![]);
}
let mut stmt = conn.prepare("SELECT DISTINCT session_id FROM tokens")?;
let sessions: Vec<String> = stmt
.query_map([], |r| r.get::<_, String>(0))?
.filter_map(|r| r.ok())
.collect();
let mut scored: Vec<(String, f64)> = sessions
.into_iter()
.map(|sid| {
let v = session_vector(conn, &sid).unwrap_or_default();
let s = cosine_tfidf(&q, &v);
(sid, s)
})
.filter(|(_, s)| *s > 0.0)
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(limit);
Ok(scored)
}

View file

@ -0,0 +1,185 @@
//! Integration tests for kei-memory.
//!
//! Constructor Pattern: each test = one scenario, one assertion target.
//! Uses tempfile for per-test isolated sqlite file. Loads source modules
//! via `#[path]` so we don't need to expose a library crate surface.
#[path = "../src/schema.rs"]
mod schema;
#[path = "../src/similarity.rs"]
mod similarity;
#[path = "../src/coaccess.rs"]
mod coaccess;
#[path = "../src/tfidf.rs"]
mod tfidf;
#[path = "../src/ingest.rs"]
mod ingest;
#[path = "../src/analyze.rs"]
mod analyze;
#[path = "../src/patterns.rs"]
mod patterns;
use rusqlite::Connection;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;
fn open_tmp() -> (TempDir, Connection) {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("kei-memory.sqlite");
let conn = Connection::open(&db_path).unwrap();
schema::migrate(&conn).unwrap();
(dir, conn)
}
fn write_jsonl(dir: &TempDir, name: &str, lines: &[&str]) -> PathBuf {
let p = dir.path().join(name);
let mut f = fs::File::create(&p).unwrap();
for l in lines {
writeln!(f, "{l}").unwrap();
}
p
}
#[test]
fn ingest_then_analyze_roundtrip() {
let (d, conn) = open_tmp();
let trace = write_jsonl(&d, "s1.jsonl", &[
r#"{"ts":1700000000,"kind":"tool_use","tool":"Bash","message":"ok"}"#,
r#"{"ts":1700000010,"kind":"tool_use","tool":"Edit","file_path":"/a.rs"}"#,
r#"{"ts":1700000020,"kind":"tool_use","tool":"Bash","is_error":true,"message":"permission denied"}"#,
]);
let n = ingest::ingest_jsonl(&conn, "s1", &trace).unwrap();
assert_eq!(n, 3);
let hdr = analyze::session_header(&conn, "s1").unwrap().unwrap();
assert_eq!(hdr.tool_call_count, 3);
assert_eq!(hdr.error_count, 1);
let report = analyze::render_report(&conn, "s1", false).unwrap();
assert!(report.contains("Tool calls: 3"));
assert!(report.contains("Errors: 1"));
}
#[test]
fn coaccess_counts_pair_within_window() {
let (d, conn) = open_tmp();
let trace = write_jsonl(&d, "s2.jsonl", &[
r#"{"ts":1700000000,"kind":"tool_use","tool":"Edit","file_path":"/a.rs"}"#,
r#"{"ts":1700000060,"kind":"tool_use","tool":"Edit","file_path":"/b.rs"}"#,
r#"{"ts":1700000120,"kind":"tool_use","tool":"Edit","file_path":"/a.rs"}"#,
]);
ingest::ingest_jsonl(&conn, "s2", &trace).unwrap();
let pairs = coaccess::top_pairs(&conn, 10).unwrap();
assert!(!pairs.is_empty());
let hit = pairs.iter().find(|(a, b, _)| {
(a == "/a.rs" && b == "/b.rs") || (a == "/b.rs" && b == "/a.rs")
});
assert!(hit.is_some(), "expected pair (/a.rs,/b.rs), got {pairs:?}");
assert!(hit.unwrap().2 >= 1);
}
#[test]
fn tfidf_similarity_between_known_docs() {
let (_d, conn) = open_tmp();
tfidf::index_document(&conn, "sA", "rust cargo workspace conflict build error").unwrap();
tfidf::index_document(&conn, "sB", "rust cargo workspace conflict ci").unwrap();
tfidf::index_document(&conn, "sC", "swift xcode simulator audio").unwrap();
let top = tfidf::top_similar(&conn, "rust cargo workspace", 3).unwrap();
assert!(!top.is_empty());
let best = &top[0].0;
assert!(best == "sA" || best == "sB", "expected sA or sB first, got {best}");
let worst = top.iter().find(|(id, _)| id == "sC");
if let Some((_, s)) = worst {
assert!(*s <= top[0].1, "unrelated doc should not outrank target");
}
}
#[test]
fn pattern_detection_finds_recurring_class() {
let (d, conn) = open_tmp();
let trace = write_jsonl(&d, "s3.jsonl", &[
r#"{"ts":1700000000,"kind":"tool_use","tool":"Bash","event_class":"worktree_denied","is_error":true}"#,
r#"{"ts":1700000010,"kind":"tool_use","tool":"Bash","event_class":"worktree_denied","is_error":true}"#,
r#"{"ts":1700000020,"kind":"tool_use","tool":"Bash","event_class":"worktree_denied","is_error":true}"#,
r#"{"ts":1700000030,"kind":"tool_use","tool":"Read","event_class":"read_ok"}"#,
]);
ingest::ingest_jsonl(&conn, "s3", &trace).unwrap();
let hits = patterns::detect_in_session(&conn, "s3").unwrap();
let wd = hits.iter().find(|h| h.event_class == "worktree_denied");
assert!(wd.is_some(), "expected worktree_denied pattern");
assert_eq!(wd.unwrap().count, 3);
}
#[test]
fn stats_counts_sessions_and_events() {
let (d, conn) = open_tmp();
let t1 = write_jsonl(&d, "a.jsonl", &[
r#"{"ts":1,"kind":"tool_use","tool":"Bash"}"#,
r#"{"ts":2,"kind":"tool_use","tool":"Edit"}"#,
]);
let t2 = write_jsonl(&d, "b.jsonl", &[
r#"{"ts":3,"kind":"tool_use","tool":"Grep"}"#,
]);
ingest::ingest_jsonl(&conn, "a", &t1).unwrap();
ingest::ingest_jsonl(&conn, "b", &t2).unwrap();
let n_sess: i64 = conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0)).unwrap();
let n_evt: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |r| r.get(0)).unwrap();
assert_eq!(n_sess, 2);
assert_eq!(n_evt, 3);
}
#[test]
fn backlog_crud_add_list_clear() {
let (_d, conn) = open_tmp();
let now = 1700000000i64;
conn.execute(
"INSERT INTO backlog (ts, item) VALUES (?1, ?2)",
rusqlite::params![now, "item-one"],
).unwrap();
conn.execute(
"INSERT INTO backlog (ts, item) VALUES (?1, ?2)",
rusqlite::params![now + 1, "item-two"],
).unwrap();
let open_ct: i64 = conn.query_row(
"SELECT COUNT(*) FROM backlog WHERE processed = 0", [], |r| r.get(0),
).unwrap();
assert_eq!(open_ct, 2);
conn.execute("UPDATE backlog SET processed = 1", []).unwrap();
let after: i64 = conn.query_row(
"SELECT COUNT(*) FROM backlog WHERE processed = 0", [], |r| r.get(0),
).unwrap();
assert_eq!(after, 0);
}
#[test]
fn cross_session_pattern_needs_two_sessions() {
let (d, conn) = open_tmp();
let a = write_jsonl(&d, "a.jsonl", &[
r#"{"ts":1,"kind":"tool_use","event_class":"foo"}"#,
]);
let b = write_jsonl(&d, "b.jsonl", &[
r#"{"ts":2,"kind":"tool_use","event_class":"foo"}"#,
]);
ingest::ingest_jsonl(&conn, "a", &a).unwrap();
ingest::ingest_jsonl(&conn, "b", &b).unwrap();
let cross = patterns::detect_cross_session(&conn).unwrap();
let foo = cross.iter().find(|p| p.event_class == "foo");
assert!(foo.is_some());
assert_eq!(foo.unwrap().count, 2);
}
#[test]
fn cosine_similarity_sanity() {
let mut a = std::collections::HashMap::new();
a.insert("rust".to_string(), 1.0);
a.insert("cargo".to_string(), 1.0);
let mut b = std::collections::HashMap::new();
b.insert("rust".to_string(), 1.0);
b.insert("cargo".to_string(), 1.0);
let s_ident = similarity::cosine_tfidf(&a, &b);
assert!((s_ident - 1.0).abs() < 1e-9);
let mut c = std::collections::HashMap::new();
c.insert("swift".to_string(), 1.0);
let s_ortho = similarity::cosine_tfidf(&a, &c);
assert!(s_ortho.abs() < 1e-9);
}