From 519600d1bff3a7814a0421a6a35fcc13639973e3 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Thu, 23 Apr 2026 05:55:12 +0800 Subject: [PATCH] feat(m1-dogfood): migrate kei-chat-store to kei-entity-store engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Primary entity = chat_messages (integer-PK; sessions stay bespoke — TEXT UUID PK incompatible with engine's IntegerPk). Secondary tables (chat_sessions, indexes, FTS rename fts_chat → fts_chat_messages) moved into custom_migrations. FTS shadow column session_id dropped (never used as MATCH filter). Archive verb NOT enabled: chat_sessions.status is TEXT enum not INTEGER flag — engine archive verb incompatible. archive_session stays bespoke. cost REAL column dropped — engine has no Real FieldKind. per-message cost struct field kept (=0.0) for API compat; session total_cost aggregate still maintained bespoke in save_message. 5/5 tests preserved + 1 new engine migration-parity smoke test. DOGFOOD prompt feedback (M1 via kei-agent-runtime prepare): 6 engine limitations surfaced for follow-up — FieldKind::TextPk, FieldKind::Real, archive-TEXT-enum variant, FTS UNINDEXED shadow cols, atom dir assumption, rusqlite drop logic. See M1 task report. Co-Authored-By: Claude Opus 4.7 (1M context) --- _primitives/_rust/Cargo.lock | 1 + _primitives/_rust/kei-chat-store/Cargo.toml | 1 + .../_rust/kei-chat-store/src/schema.rs | 77 ++++++++++++------- .../_rust/kei-chat-store/src/search.rs | 48 +++++++----- .../_rust/kei-chat-store/src/sessions.rs | 35 ++++++--- _primitives/_rust/kei-chat-store/src/store.rs | 31 ++++---- .../_rust/kei-chat-store/tests/integration.rs | 21 +++++ 7 files changed, 143 insertions(+), 71 deletions(-) diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 908b8c5..bae2f9b 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -1935,6 +1935,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "kei-entity-store", "rusqlite", "serde", "serde_json", diff --git a/_primitives/_rust/kei-chat-store/Cargo.toml b/_primitives/_rust/kei-chat-store/Cargo.toml index 3b3e03f..ee6b8d5 100644 --- a/_primitives/_rust/kei-chat-store/Cargo.toml +++ b/_primitives/_rust/kei-chat-store/Cargo.toml @@ -14,6 +14,7 @@ name = "kei_chat_store" path = "src/lib.rs" [dependencies] +kei-entity-store = { path = "../kei-entity-store" } rusqlite = { version = "0.31", features = ["bundled"] } clap = { version = "4", features = ["derive"] } serde = { version = "1", features = ["derive"] } diff --git a/_primitives/_rust/kei-chat-store/src/schema.rs b/_primitives/_rust/kei-chat-store/src/schema.rs index 52cf182..2291c94 100644 --- a/_primitives/_rust/kei-chat-store/src/schema.rs +++ b/_primitives/_rust/kei-chat-store/src/schema.rs @@ -1,8 +1,45 @@ -//! Chat SQLite schema. +//! kei-chat-store EntitySchema — declarative spec consumed by +//! `kei_entity_store::Store` and its verb templates. +//! +//! Shape (Layer-A convergence, 2026-04-23): +//! +//! - Primary entity = `chat_messages` (INTEGER PK; required by engine). +//! Engine owns: create / get / list / search verbs + FTS reindex. +//! - Bespoke: `chat_sessions` has a TEXT UUID primary key that the +//! engine's `FieldKind::IntegerPk` cannot represent, so its DDL rides +//! the engine's `custom_migrations` slot and its CRUD stays in +//! `sessions.rs` (analogous to kei-task's milestones / deps / graph). +//! - Archive: sessions use a TEXT `status` enum ('active' | 'archived') +//! rather than an INTEGER flag, so the engine `archive` verb is NOT +//! enabled. Session archival stays bespoke. +//! - Per-message `cost` (REAL in legacy) is dropped from `chat_messages`: +//! the engine has no REAL FieldKind and the only consumer is the +//! session-level aggregate `chat_sessions.total_cost`, updated +//! bespoke in `save_message`. No caller reads per-message cost in +//! current tests or the CLI surface. +//! +//! FTS column-name change vs pre-convergence shape: +//! legacy fts_chat(message_id, session_id UNINDEXED, content) +//! engine fts_chat_messages(chat_messages_id UNINDEXED, content) +//! The `session_id` shadow column was UNINDEXED (never matched on) so +//! no search path regresses. Fresh databases only — on-disk DBs from +//! the pre-engine era need recreation. -use rusqlite::{Connection, Result}; +use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef}; + +static FIELDS: &[FieldDef] = &[ + FieldDef::pk("id"), + FieldDef::text_nn("session_id"), + FieldDef::text_nn("role"), + FieldDef::text_nn("content"), + FieldDef::integer("tokens_in"), + FieldDef::integer("tokens_out"), + FieldDef::created_at(), +]; + +const DDL_SECONDARY: &str = r#" + CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id); -const DDL_MAIN: &str = r#" CREATE TABLE IF NOT EXISTS chat_sessions ( id TEXT PRIMARY KEY, project TEXT NOT NULL, @@ -17,28 +54,16 @@ const DDL_MAIN: &str = r#" ); CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project); CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status); - - CREATE TABLE IF NOT EXISTS chat_messages ( - id INTEGER PRIMARY KEY, - session_id TEXT NOT NULL REFERENCES chat_sessions(id) ON DELETE CASCADE, - role TEXT NOT NULL, - content TEXT NOT NULL, - tokens_in INTEGER DEFAULT 0, - tokens_out INTEGER DEFAULT 0, - cost REAL DEFAULT 0.0, - created_at INTEGER NOT NULL - ); - CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id); "#; -const DDL_FTS: &str = r#" - CREATE VIRTUAL TABLE IF NOT EXISTS fts_chat - USING fts5(message_id UNINDEXED, session_id UNINDEXED, content, - tokenize='porter unicode61'); -"#; - -pub fn create_schema(conn: &Connection) -> Result<()> { - conn.execute_batch(DDL_MAIN)?; - conn.execute_batch(DDL_FTS)?; - Ok(()) -} +pub static CHAT_SCHEMA: EntitySchema = EntitySchema { + name: "chat_message", + table: "chat_messages", + fields: FIELDS, + enabled_verbs: &["create", "get", "list", "search"], + fts_columns: Some(&["content"]), + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: None, + custom_migrations: &[DDL_SECONDARY], +}; diff --git a/_primitives/_rust/kei-chat-store/src/search.rs b/_primitives/_rust/kei-chat-store/src/search.rs index 362c619..591ef11 100644 --- a/_primitives/_rust/kei-chat-store/src/search.rs +++ b/_primitives/_rust/kei-chat-store/src/search.rs @@ -1,26 +1,38 @@ //! FTS over messages. +//! +//! Layer-A convergence (2026-04-23): delegates to +//! `kei_entity_store::verbs::search` using `CHAT_SCHEMA`. The engine +//! handles FTS5 JOIN + rank ordering; this module maps the generic +//! JSON result back to typed `ChatMessage` rows for legacy callers. +//! Per-message `cost` is not persisted after the convergence (see +//! `schema.rs` note); `cost` is populated as 0.0 on every hit. +use crate::schema::CHAT_SCHEMA; use crate::sessions::ChatMessage; use crate::store::Store; -use anyhow::Result; -use rusqlite::params; +use anyhow::{anyhow, Result}; +use kei_entity_store::verbs::search as v_search; +use serde_json::{json, Value}; pub fn search(store: &Store, query: &str, limit: i64) -> Result> { let lim = if limit <= 0 { 20 } else { limit }; - let mut stmt = store.conn().prepare( - "SELECT m.id, m.session_id, m.role, m.content, m.tokens_in, m.tokens_out, - m.cost, m.created_at - FROM fts_chat f - JOIN chat_messages m ON m.id = f.message_id - WHERE fts_chat MATCH ?1 ORDER BY rank LIMIT ?2", - )?; - let rows = stmt.query_map(params![query, lim], |r| { - Ok(ChatMessage { - id: r.get(0)?, session_id: r.get(1)?, role: r.get(2)?, content: r.get(3)?, - tokens_in: r.get(4)?, tokens_out: r.get(5)?, cost: r.get(6)?, created_at: r.get(7)?, - }) - })?; - let mut out = Vec::new(); - for r in rows { out.push(r?); } - Ok(out) + let v = v_search::run(store.conn(), &CHAT_SCHEMA, json!({ "query": query, "limit": lim })) + .map_err(|e| anyhow!("{e}"))?; + let arr = v["results"] + .as_array() + .ok_or_else(|| anyhow!("search: results missing"))?; + arr.iter().map(row_to_message).collect() +} + +fn row_to_message(r: &Value) -> Result { + Ok(ChatMessage { + id: r["id"].as_i64().unwrap_or(0), + session_id: r["session_id"].as_str().unwrap_or("").into(), + role: r["role"].as_str().unwrap_or("").into(), + content: r["content"].as_str().unwrap_or("").into(), + tokens_in: r["tokens_in"].as_i64().unwrap_or(0), + tokens_out: r["tokens_out"].as_i64().unwrap_or(0), + cost: 0.0, + created_at: r["created_at"].as_i64().unwrap_or(0), + }) } diff --git a/_primitives/_rust/kei-chat-store/src/sessions.rs b/_primitives/_rust/kei-chat-store/src/sessions.rs index 7f89e6a..a2e4b96 100644 --- a/_primitives/_rust/kei-chat-store/src/sessions.rs +++ b/_primitives/_rust/kei-chat-store/src/sessions.rs @@ -1,10 +1,20 @@ //! Session + message operations. +//! +//! Layer-A convergence (2026-04-23): message INSERT + FTS reindex +//! delegate to `kei_entity_store::verbs::create` via `CHAT_SCHEMA`. +//! Session rows still use bespoke SQL (TEXT UUID PK + TEXT status enum +//! are outside the engine's INTEGER-PK / INTEGER-archived-field model). +//! `save_message` still owns the bespoke session-counter UPDATE after +//! each message insert — same semantics as pre-convergence. +use crate::schema::CHAT_SCHEMA; use crate::store::Store; use anyhow::{anyhow, Result}; use chrono::Utc; +use kei_entity_store::verbs::create as v_create; use rusqlite::params; use serde::{Deserialize, Serialize}; +use serde_json::json; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ChatSession { @@ -45,18 +55,19 @@ pub fn start_session(store: &Store, project: &str, title: &str, model: &str) -> pub fn save_message(store: &Store, msg: &ChatMessage) -> Result { let now = Utc::now().timestamp(); - let created = if msg.created_at == 0 { now } else { msg.created_at }; - store.conn().execute( - "INSERT INTO chat_messages (session_id, role, content, tokens_in, tokens_out, cost, created_at) - VALUES (?1,?2,?3,?4,?5,?6,?7)", - params![msg.session_id, msg.role, msg.content, msg.tokens_in, - msg.tokens_out, msg.cost, created], - )?; - let id = store.conn().last_insert_rowid(); - store.conn().execute( - "INSERT INTO fts_chat (message_id, session_id, content) VALUES (?1,?2,?3)", - params![id, msg.session_id, msg.content], - )?; + let payload = json!({ + "session_id": msg.session_id, + "role": msg.role, + "content": msg.content, + "tokens_in": msg.tokens_in, + "tokens_out": msg.tokens_out, + "created_at": msg.created_at, + }); + let v = v_create::run(store.conn(), &CHAT_SCHEMA, payload) + .map_err(|e| anyhow!("{e}"))?; + let id = v["id"] + .as_i64() + .ok_or_else(|| anyhow!("missing id in create response"))?; store.conn().execute( "UPDATE chat_sessions SET message_count = message_count + 1, total_tokens = total_tokens + ?1, total_cost = total_cost + ?2, diff --git a/_primitives/_rust/kei-chat-store/src/store.rs b/_primitives/_rust/kei-chat-store/src/store.rs index 1983471..2a602b7 100644 --- a/_primitives/_rust/kei-chat-store/src/store.rs +++ b/_primitives/_rust/kei-chat-store/src/store.rs @@ -1,30 +1,31 @@ -//! Store open/close helper. +//! Chat store — thin shim over `kei_entity_store::Store`. +//! +//! Layer-A convergence pilot (2026-04-23): message CRUD + FTS run +//! through `kei_entity_store::verbs::*` using `CHAT_SCHEMA`. Session +//! management (TEXT UUID PK, counter updates, status enum) stays +//! bespoke in `sessions.rs` — the `conn()` escape hatch is the only +//! surface it needs. -use crate::schema::create_schema; -use anyhow::{Context, Result}; +use crate::schema::CHAT_SCHEMA; +use anyhow::Result; +use kei_entity_store::Store as EntityStore; use rusqlite::Connection; use std::path::Path; pub struct Store { - conn: Connection, + inner: EntityStore, } impl Store { pub fn open(path: &Path) -> Result { - if let Some(parent) = path.parent() { - let _ = std::fs::create_dir_all(parent); - } - let conn = Connection::open(path).context("open sqlite")?; - conn.pragma_update(None, "journal_mode", "WAL").ok(); - create_schema(&conn)?; - Ok(Self { conn }) + let inner = EntityStore::open(path, &CHAT_SCHEMA)?; + Ok(Self { inner }) } pub fn open_memory() -> Result { - let conn = Connection::open_in_memory()?; - create_schema(&conn)?; - Ok(Self { conn }) + let inner = EntityStore::open_memory(&CHAT_SCHEMA)?; + Ok(Self { inner }) } - pub fn conn(&self) -> &Connection { &self.conn } + pub fn conn(&self) -> &Connection { self.inner.conn() } } diff --git a/_primitives/_rust/kei-chat-store/tests/integration.rs b/_primitives/_rust/kei-chat-store/tests/integration.rs index 9958e76..9385315 100644 --- a/_primitives/_rust/kei-chat-store/tests/integration.rs +++ b/_primitives/_rust/kei-chat-store/tests/integration.rs @@ -41,6 +41,27 @@ fn archive_session_works() { assert_eq!(sess.status, "archived"); } +#[test] +fn engine_migration_parity_smoke() { + // Layer-A convergence parity: fresh session opens cleanly through + // kei_entity_store::Store + CHAT_SCHEMA, start_session returns a + // UUID that get_session can retrieve with id > 0 chars and the + // engine-generated chat_messages table is writeable. + let s = mk(); + let sid = start_session(&s, "conv-layer-a", "smoke", "claude-opus-4").unwrap(); + assert!(!sid.is_empty(), "session id should be a non-empty UUID"); + let mid = save_message(&s, &ChatMessage { + session_id: sid.clone(), role: "user".into(), + content: "migration-parity probe".into(), + tokens_in: 1, tokens_out: 0, cost: 0.0, + ..Default::default() + }).unwrap(); + assert!(mid > 0, "engine-backed message id must be > 0"); + let sess = get_session(&s, &sid).unwrap().unwrap(); + assert_eq!(sess.id, sid); + assert_eq!(sess.message_count, 1); +} + #[test] fn stats_aggregates() { let s = mk();