Merge M1 — kei-chat-store migration (dogfood via prepare)

This commit is contained in:
Parfii-bot 2026-04-23 05:55:34 +08:00
commit 6edcba7c4b
7 changed files with 143 additions and 71 deletions

View file

@ -1935,6 +1935,7 @@ dependencies = [
"anyhow",
"chrono",
"clap",
"kei-entity-store",
"rusqlite",
"serde",
"serde_json",

View file

@ -14,6 +14,7 @@ name = "kei_chat_store"
path = "src/lib.rs"
[dependencies]
kei-entity-store = { path = "../kei-entity-store" }
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }

View file

@ -1,8 +1,45 @@
//! Chat SQLite schema.
//! kei-chat-store EntitySchema — declarative spec consumed by
//! `kei_entity_store::Store` and its verb templates.
//!
//! Shape (Layer-A convergence, 2026-04-23):
//!
//! - Primary entity = `chat_messages` (INTEGER PK; required by engine).
//! Engine owns: create / get / list / search verbs + FTS reindex.
//! - Bespoke: `chat_sessions` has a TEXT UUID primary key that the
//! engine's `FieldKind::IntegerPk` cannot represent, so its DDL rides
//! the engine's `custom_migrations` slot and its CRUD stays in
//! `sessions.rs` (analogous to kei-task's milestones / deps / graph).
//! - Archive: sessions use a TEXT `status` enum ('active' | 'archived')
//! rather than an INTEGER flag, so the engine `archive` verb is NOT
//! enabled. Session archival stays bespoke.
//! - Per-message `cost` (REAL in legacy) is dropped from `chat_messages`:
//! the engine has no REAL FieldKind and the only consumer is the
//! session-level aggregate `chat_sessions.total_cost`, updated
//! bespoke in `save_message`. No caller reads per-message cost in
//! current tests or the CLI surface.
//!
//! FTS column-name change vs pre-convergence shape:
//! legacy fts_chat(message_id, session_id UNINDEXED, content)
//! engine fts_chat_messages(chat_messages_id UNINDEXED, content)
//! The `session_id` shadow column was UNINDEXED (never matched on) so
//! no search path regresses. Fresh databases only — on-disk DBs from
//! the pre-engine era need recreation.
use rusqlite::{Connection, Result};
use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef};
static FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("session_id"),
FieldDef::text_nn("role"),
FieldDef::text_nn("content"),
FieldDef::integer("tokens_in"),
FieldDef::integer("tokens_out"),
FieldDef::created_at(),
];
const DDL_SECONDARY: &str = r#"
CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id);
const DDL_MAIN: &str = r#"
CREATE TABLE IF NOT EXISTS chat_sessions (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
@ -17,28 +54,16 @@ const DDL_MAIN: &str = r#"
);
CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project);
CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status);
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES chat_sessions(id) ON DELETE CASCADE,
role TEXT NOT NULL,
content TEXT NOT NULL,
tokens_in INTEGER DEFAULT 0,
tokens_out INTEGER DEFAULT 0,
cost REAL DEFAULT 0.0,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id);
"#;
const DDL_FTS: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS fts_chat
USING fts5(message_id UNINDEXED, session_id UNINDEXED, content,
tokenize='porter unicode61');
"#;
pub fn create_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(DDL_MAIN)?;
conn.execute_batch(DDL_FTS)?;
Ok(())
}
pub static CHAT_SCHEMA: EntitySchema = EntitySchema {
name: "chat_message",
table: "chat_messages",
fields: FIELDS,
enabled_verbs: &["create", "get", "list", "search"],
fts_columns: Some(&["content"]),
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: None,
custom_migrations: &[DDL_SECONDARY],
};

View file

@ -1,26 +1,38 @@
//! FTS over messages.
//!
//! Layer-A convergence (2026-04-23): delegates to
//! `kei_entity_store::verbs::search` using `CHAT_SCHEMA`. The engine
//! handles FTS5 JOIN + rank ordering; this module maps the generic
//! JSON result back to typed `ChatMessage` rows for legacy callers.
//! Per-message `cost` is not persisted after the convergence (see
//! `schema.rs` note); `cost` is populated as 0.0 on every hit.
use crate::schema::CHAT_SCHEMA;
use crate::sessions::ChatMessage;
use crate::store::Store;
use anyhow::Result;
use rusqlite::params;
use anyhow::{anyhow, Result};
use kei_entity_store::verbs::search as v_search;
use serde_json::{json, Value};
pub fn search(store: &Store, query: &str, limit: i64) -> Result<Vec<ChatMessage>> {
let lim = if limit <= 0 { 20 } else { limit };
let mut stmt = store.conn().prepare(
"SELECT m.id, m.session_id, m.role, m.content, m.tokens_in, m.tokens_out,
m.cost, m.created_at
FROM fts_chat f
JOIN chat_messages m ON m.id = f.message_id
WHERE fts_chat MATCH ?1 ORDER BY rank LIMIT ?2",
)?;
let rows = stmt.query_map(params![query, lim], |r| {
Ok(ChatMessage {
id: r.get(0)?, session_id: r.get(1)?, role: r.get(2)?, content: r.get(3)?,
tokens_in: r.get(4)?, tokens_out: r.get(5)?, cost: r.get(6)?, created_at: r.get(7)?,
})
})?;
let mut out = Vec::new();
for r in rows { out.push(r?); }
Ok(out)
let v = v_search::run(store.conn(), &CHAT_SCHEMA, json!({ "query": query, "limit": lim }))
.map_err(|e| anyhow!("{e}"))?;
let arr = v["results"]
.as_array()
.ok_or_else(|| anyhow!("search: results missing"))?;
arr.iter().map(row_to_message).collect()
}
fn row_to_message(r: &Value) -> Result<ChatMessage> {
Ok(ChatMessage {
id: r["id"].as_i64().unwrap_or(0),
session_id: r["session_id"].as_str().unwrap_or("").into(),
role: r["role"].as_str().unwrap_or("").into(),
content: r["content"].as_str().unwrap_or("").into(),
tokens_in: r["tokens_in"].as_i64().unwrap_or(0),
tokens_out: r["tokens_out"].as_i64().unwrap_or(0),
cost: 0.0,
created_at: r["created_at"].as_i64().unwrap_or(0),
})
}

View file

@ -1,10 +1,20 @@
//! Session + message operations.
//!
//! Layer-A convergence (2026-04-23): message INSERT + FTS reindex
//! delegate to `kei_entity_store::verbs::create` via `CHAT_SCHEMA`.
//! Session rows still use bespoke SQL (TEXT UUID PK + TEXT status enum
//! are outside the engine's INTEGER-PK / INTEGER-archived-field model).
//! `save_message` still owns the bespoke session-counter UPDATE after
//! each message insert — same semantics as pre-convergence.
use crate::schema::CHAT_SCHEMA;
use crate::store::Store;
use anyhow::{anyhow, Result};
use chrono::Utc;
use kei_entity_store::verbs::create as v_create;
use rusqlite::params;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ChatSession {
@ -45,18 +55,19 @@ pub fn start_session(store: &Store, project: &str, title: &str, model: &str) ->
pub fn save_message(store: &Store, msg: &ChatMessage) -> Result<i64> {
let now = Utc::now().timestamp();
let created = if msg.created_at == 0 { now } else { msg.created_at };
store.conn().execute(
"INSERT INTO chat_messages (session_id, role, content, tokens_in, tokens_out, cost, created_at)
VALUES (?1,?2,?3,?4,?5,?6,?7)",
params![msg.session_id, msg.role, msg.content, msg.tokens_in,
msg.tokens_out, msg.cost, created],
)?;
let id = store.conn().last_insert_rowid();
store.conn().execute(
"INSERT INTO fts_chat (message_id, session_id, content) VALUES (?1,?2,?3)",
params![id, msg.session_id, msg.content],
)?;
let payload = json!({
"session_id": msg.session_id,
"role": msg.role,
"content": msg.content,
"tokens_in": msg.tokens_in,
"tokens_out": msg.tokens_out,
"created_at": msg.created_at,
});
let v = v_create::run(store.conn(), &CHAT_SCHEMA, payload)
.map_err(|e| anyhow!("{e}"))?;
let id = v["id"]
.as_i64()
.ok_or_else(|| anyhow!("missing id in create response"))?;
store.conn().execute(
"UPDATE chat_sessions SET message_count = message_count + 1,
total_tokens = total_tokens + ?1, total_cost = total_cost + ?2,

View file

@ -1,30 +1,31 @@
//! Store open/close helper.
//! Chat store — thin shim over `kei_entity_store::Store`.
//!
//! Layer-A convergence pilot (2026-04-23): message CRUD + FTS run
//! through `kei_entity_store::verbs::*` using `CHAT_SCHEMA`. Session
//! management (TEXT UUID PK, counter updates, status enum) stays
//! bespoke in `sessions.rs` — the `conn()` escape hatch is the only
//! surface it needs.
use crate::schema::create_schema;
use anyhow::{Context, Result};
use crate::schema::CHAT_SCHEMA;
use anyhow::Result;
use kei_entity_store::Store as EntityStore;
use rusqlite::Connection;
use std::path::Path;
pub struct Store {
conn: Connection,
inner: EntityStore,
}
impl Store {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path).context("open sqlite")?;
conn.pragma_update(None, "journal_mode", "WAL").ok();
create_schema(&conn)?;
Ok(Self { conn })
let inner = EntityStore::open(path, &CHAT_SCHEMA)?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
create_schema(&conn)?;
Ok(Self { conn })
let inner = EntityStore::open_memory(&CHAT_SCHEMA)?;
Ok(Self { inner })
}
pub fn conn(&self) -> &Connection { &self.conn }
pub fn conn(&self) -> &Connection { self.inner.conn() }
}

View file

@ -41,6 +41,27 @@ fn archive_session_works() {
assert_eq!(sess.status, "archived");
}
#[test]
fn engine_migration_parity_smoke() {
// Layer-A convergence parity: fresh session opens cleanly through
// kei_entity_store::Store + CHAT_SCHEMA, start_session returns a
// UUID that get_session can retrieve with id > 0 chars and the
// engine-generated chat_messages table is writeable.
let s = mk();
let sid = start_session(&s, "conv-layer-a", "smoke", "claude-opus-4").unwrap();
assert!(!sid.is_empty(), "session id should be a non-empty UUID");
let mid = save_message(&s, &ChatMessage {
session_id: sid.clone(), role: "user".into(),
content: "migration-parity probe".into(),
tokens_in: 1, tokens_out: 0, cost: 0.0,
..Default::default()
}).unwrap();
assert!(mid > 0, "engine-backed message id must be > 0");
let sess = get_session(&s, &sid).unwrap().unwrap();
assert_eq!(sess.id, sid);
assert_eq!(sess.message_count, 1);
}
#[test]
fn stats_aggregates() {
let s = mk();