diff --git a/_primitives/_rust/kei-chat-store/src/schema.rs b/_primitives/_rust/kei-chat-store/src/schema.rs index 56d58d7..24822cd 100644 --- a/_primitives/_rust/kei-chat-store/src/schema.rs +++ b/_primitives/_rust/kei-chat-store/src/schema.rs @@ -1,37 +1,27 @@ -//! kei-chat-store EntitySchema — declarative spec consumed by +//! kei-chat-store EntitySchemas — declarative specs consumed by //! `kei_entity_store::Store` and its verb templates. //! -//! Shape (Layer-A convergence, 2026-04-23; cost-column re-migration -//! 2026-04-23 wave 8): +//! Shape (multi-schema convergence, 2026-04-23): //! -//! - Primary entity = `chat_messages` (INTEGER PK; required by engine). -//! Engine owns: create / get / list / search verbs + FTS reindex. -//! - Bespoke: `chat_sessions` has a TEXT UUID primary key. The engine -//! gained `FieldKind::TextPk` in wave 8 but Store::open currently -//! takes a SINGLE EntitySchema, so a second managed schema for -//! sessions would require engine multi-schema support. Until that -//! lands, the session DDL rides `custom_migrations` and its CRUD -//! stays in `sessions.rs` (analogous to kei-task's milestones / deps -//! / graph). **Known open:** promote chat_sessions to a second -//! EntitySchema (`TextPk` + `TextArchiveEnum`) once engine gains -//! multi-schema support. -//! - Archive: sessions use a TEXT `status` enum ('active' | 'archived'). -//! Session archival stays bespoke (same reason as above). -//! - Per-message `cost` (REAL) is restored via -//! `FieldKind::RealDefault(0.0)` — wave-8 addition. Engine-managed -//! INSERT/SELECT, no bespoke SQL needed. Previously dropped when -//! the engine had no REAL kind; re-instated 2026-04-23. +//! - `MESSAGES_SCHEMA`: primary entity `chat_messages` (INTEGER PK; +//! engine-owned create/get/list/search + FTS reindex). +//! - `SESSIONS_SCHEMA`: second entity `chat_sessions` (TEXT UUID PK + +//! `TextArchiveEnum` status column, engine-owned create/get/archive). +//! Previously rode `custom_migrations`; now a first-class schema +//! since `Store::open` accepts a slice of schemas. +//! - `ALL_SCHEMAS`: the `&[&EntitySchema]` slice the `Store` wrapper +//! hands to the engine on open. //! -//! FTS column-name change vs pre-convergence shape: -//! legacy fts_chat(message_id, session_id UNINDEXED, content) -//! engine fts_chat_messages(chat_messages_id UNINDEXED, content) -//! The `session_id` shadow column was UNINDEXED (never matched on) so -//! no search path regresses. Fresh databases only — on-disk DBs from -//! the pre-engine era need recreation. +//! The session aggregates (`message_count`, `total_tokens`, `total_cost`) +//! are still updated via bespoke SQL in `sessions.rs` because the +//! engine has no `increment-on-related-insert` verb. That bespoke path +//! shrank from "whole row lifecycle" to "UPDATE counters only". use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef}; -static FIELDS: &[FieldDef] = &[ +// ---- chat_messages --------------------------------------------------- + +static MESSAGE_FIELDS: &[FieldDef] = &[ FieldDef::pk("id"), FieldDef::text_nn("session_id"), FieldDef::text_nn("role"), @@ -42,33 +32,58 @@ static FIELDS: &[FieldDef] = &[ FieldDef::created_at(), ]; -const DDL_SECONDARY: &str = r#" - CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id); +/// Keep the idx_cm_session index around — generic schema has no +/// `indexed` flag for one-off single-column indexes on non-PK fields. +const MESSAGES_INDEX_DDL: &str = + "CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id);"; - CREATE TABLE IF NOT EXISTS chat_sessions ( - id TEXT PRIMARY KEY, - project TEXT NOT NULL, - title TEXT DEFAULT '', - model TEXT DEFAULT '', - status TEXT DEFAULT 'active', - message_count INTEGER DEFAULT 0, - total_tokens INTEGER DEFAULT 0, - total_cost REAL DEFAULT 0.0, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL - ); - CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project); - CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status); -"#; - -pub static CHAT_SCHEMA: EntitySchema = EntitySchema { +pub static MESSAGES_SCHEMA: EntitySchema = EntitySchema { name: "chat_message", table: "chat_messages", - fields: FIELDS, + fields: MESSAGE_FIELDS, enabled_verbs: &["create", "get", "list", "search"], fts_columns: Some(&["content"]), edge_table: None, edge_key_kind: EdgeKeyKind::IntegerPair, archived_field: None, - custom_migrations: &[DDL_SECONDARY], + custom_migrations: &[MESSAGES_INDEX_DDL], }; + +// ---- chat_sessions --------------------------------------------------- + +static SESSION_FIELDS: &[FieldDef] = &[ + FieldDef::text_pk("id"), + FieldDef::text_nn("project"), + FieldDef::text_default("title", ""), + FieldDef::text_default("model", ""), + FieldDef::text_archive_enum("status", "active", "archived"), + FieldDef::integer("status_at"), + FieldDef::integer("message_count"), + FieldDef::integer("total_tokens"), + FieldDef::real_default("total_cost", 0.0), + FieldDef::created_at(), + FieldDef::updated_at(), +]; + +/// Legacy indexes on chat_sessions (project, status). `indexed` flag on +/// FieldDef only covers single-column indexes with a deterministic +/// `idx__` name — matches what we need here. +const SESSIONS_INDEX_DDL: &str = "\ + CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project);\n\ + CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status);"; + +pub static SESSIONS_SCHEMA: EntitySchema = EntitySchema { + name: "chat_session", + table: "chat_sessions", + fields: SESSION_FIELDS, + enabled_verbs: &["create", "get", "archive", "update"], + fts_columns: None, + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: Some("status"), + custom_migrations: &[SESSIONS_INDEX_DDL], +}; + +// ---- aggregate slice for Store::open ------------------------------- + +pub static ALL_SCHEMAS: &[&EntitySchema] = &[&MESSAGES_SCHEMA, &SESSIONS_SCHEMA]; diff --git a/_primitives/_rust/kei-chat-store/src/search.rs b/_primitives/_rust/kei-chat-store/src/search.rs index 198b7e8..d21fe18 100644 --- a/_primitives/_rust/kei-chat-store/src/search.rs +++ b/_primitives/_rust/kei-chat-store/src/search.rs @@ -1,13 +1,13 @@ //! FTS over messages. //! //! Layer-A convergence (2026-04-23): delegates to -//! `kei_entity_store::verbs::search` using `CHAT_SCHEMA`. The engine +//! `kei_entity_store::verbs::search` using `MESSAGES_SCHEMA`. The engine //! handles FTS5 JOIN + rank ordering; this module maps the generic //! JSON result back to typed `ChatMessage` rows for legacy callers. //! Per-message `cost` is persisted (engine `RealDefault` field); //! `row_to_message` reads it back as f64. -use crate::schema::CHAT_SCHEMA; +use crate::schema::MESSAGES_SCHEMA; use crate::sessions::ChatMessage; use crate::store::Store; use anyhow::{anyhow, Result}; @@ -16,7 +16,7 @@ use serde_json::{json, Value}; pub fn search(store: &Store, query: &str, limit: i64) -> Result> { let lim = if limit <= 0 { 20 } else { limit }; - let v = v_search::run(store.conn(), &CHAT_SCHEMA, json!({ "query": query, "limit": lim })) + let v = v_search::run(store.conn(), &MESSAGES_SCHEMA, json!({ "query": query, "limit": lim })) .map_err(|e| anyhow!("{e}"))?; let arr = v["results"] .as_array() diff --git a/_primitives/_rust/kei-chat-store/src/sessions.rs b/_primitives/_rust/kei-chat-store/src/sessions.rs index 39cd56e..84033c5 100644 --- a/_primitives/_rust/kei-chat-store/src/sessions.rs +++ b/_primitives/_rust/kei-chat-store/src/sessions.rs @@ -1,20 +1,23 @@ //! Session + message operations. //! -//! Layer-A convergence (2026-04-23): message INSERT + FTS reindex -//! delegate to `kei_entity_store::verbs::create` via `CHAT_SCHEMA`. -//! Session rows still use bespoke SQL (TEXT UUID PK + TEXT status enum -//! are outside the engine's INTEGER-PK / INTEGER-archived-field model). -//! `save_message` still owns the bespoke session-counter UPDATE after -//! each message insert — same semantics as pre-convergence. +//! Multi-schema convergence (2026-04-23): BOTH sessions and messages +//! now flow through `kei_entity_store::verbs::*`. `start_session` uses +//! `create` against `SESSIONS_SCHEMA` (TextPk + TextArchiveEnum); +//! `archive_session` uses `archive`; `get_session` uses `get`; +//! `save_message` uses `create` against `MESSAGES_SCHEMA`. +//! +//! Only the per-message aggregate update on `chat_sessions` +//! (message_count / total_tokens / total_cost) stays bespoke — the +//! engine has no "update-on-related-insert" verb. -use crate::schema::CHAT_SCHEMA; +use crate::schema::{MESSAGES_SCHEMA, SESSIONS_SCHEMA}; use crate::store::Store; use anyhow::{anyhow, Result}; use chrono::Utc; -use kei_entity_store::verbs::create as v_create; +use kei_entity_store::verbs::{archive as v_archive, create as v_create, get as v_get}; use rusqlite::params; use serde::{Deserialize, Serialize}; -use serde_json::json; +use serde_json::{json, Value}; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ChatSession { @@ -44,12 +47,12 @@ pub struct ChatMessage { pub fn start_session(store: &Store, project: &str, title: &str, model: &str) -> Result { let id = uuid::Uuid::new_v4().to_string(); - let now = Utc::now().timestamp(); - store.conn().execute( - "INSERT INTO chat_sessions (id, project, title, model, status, created_at, updated_at) - VALUES (?1,?2,?3,?4,'active',?5,?5)", - params![id, project, title, model, now], - )?; + v_create::run( + store.conn(), + &SESSIONS_SCHEMA, + json!({ "id": id, "project": project, "title": title, "model": model }), + ) + .map_err(|e| anyhow!("{e}"))?; Ok(id) } @@ -64,43 +67,68 @@ pub fn save_message(store: &Store, msg: &ChatMessage) -> Result { "cost": msg.cost, "created_at": msg.created_at, }); - let v = v_create::run(store.conn(), &CHAT_SCHEMA, payload) + let v = v_create::run(store.conn(), &MESSAGES_SCHEMA, payload) .map_err(|e| anyhow!("{e}"))?; let id = v["id"] .as_i64() .ok_or_else(|| anyhow!("missing id in create response"))?; - store.conn().execute( - "UPDATE chat_sessions SET message_count = message_count + 1, - total_tokens = total_tokens + ?1, total_cost = total_cost + ?2, - updated_at = ?3 WHERE id = ?4", - params![msg.tokens_in + msg.tokens_out, msg.cost, now, msg.session_id], - )?; + bump_session_totals(store, &msg.session_id, msg.tokens_in + msg.tokens_out, msg.cost, now)?; Ok(id) } -pub fn archive_session(store: &Store, session_id: &str) -> Result<()> { - let n = store.conn().execute( - "UPDATE chat_sessions SET status='archived', updated_at=?1 WHERE id=?2", - params![Utc::now().timestamp(), session_id], +/// Bespoke aggregate update — engine has no "increment-on-related-insert" +/// verb. Keeps the per-session counters in sync with what was just +/// inserted into chat_messages. +fn bump_session_totals( + store: &Store, + session_id: &str, + tokens_delta: i64, + cost_delta: f64, + now: i64, +) -> Result<()> { + store.conn().execute( + "UPDATE chat_sessions + SET message_count = message_count + 1, + total_tokens = total_tokens + ?1, + total_cost = total_cost + ?2, + updated_at = ?3 + WHERE id = ?4", + params![tokens_delta, cost_delta, now, session_id], )?; - if n == 0 { - return Err(anyhow!("session {session_id} not found")); - } + Ok(()) +} + +pub fn archive_session(store: &Store, session_id: &str) -> Result<()> { + v_archive::run(store.conn(), &SESSIONS_SCHEMA, json!({ "id": session_id })) + .map_err(|e| anyhow!("{e}"))?; Ok(()) } pub fn get_session(store: &Store, id: &str) -> Result> { - let mut stmt = store.conn().prepare( - "SELECT id, project, title, model, status, message_count, total_tokens, - total_cost, created_at, updated_at FROM chat_sessions WHERE id=?1", - )?; - let mut rows = stmt.query(params![id])?; - if let Some(r) = rows.next()? { - return Ok(Some(ChatSession { - id: r.get(0)?, project: r.get(1)?, title: r.get(2)?, model: r.get(3)?, - status: r.get(4)?, message_count: r.get(5)?, total_tokens: r.get(6)?, - total_cost: r.get(7)?, created_at: r.get(8)?, updated_at: r.get(9)?, - })); + match v_get::run(store.conn(), &SESSIONS_SCHEMA, json!({ "id": id })) { + Ok(v) => Ok(Some(session_from_json(v)?)), + Err(e) if e.exit_code() == 2 => Ok(None), + Err(e) => Err(anyhow!("{e}")), } - Ok(None) +} + +fn session_from_json(v: Value) -> Result { + let obj = v + .as_object() + .ok_or_else(|| anyhow!("expected object in get response"))?; + let s = |k: &str| obj.get(k).and_then(|x| x.as_str()).unwrap_or("").to_string(); + let i = |k: &str| obj.get(k).and_then(|x| x.as_i64()).unwrap_or(0); + let f = |k: &str| obj.get(k).and_then(|x| x.as_f64()).unwrap_or(0.0); + Ok(ChatSession { + id: s("id"), + project: s("project"), + title: s("title"), + model: s("model"), + status: s("status"), + message_count: i("message_count"), + total_tokens: i("total_tokens"), + total_cost: f("total_cost"), + created_at: i("created_at"), + updated_at: i("updated_at"), + }) } diff --git a/_primitives/_rust/kei-chat-store/src/store.rs b/_primitives/_rust/kei-chat-store/src/store.rs index 2a602b7..cf142e8 100644 --- a/_primitives/_rust/kei-chat-store/src/store.rs +++ b/_primitives/_rust/kei-chat-store/src/store.rs @@ -1,12 +1,17 @@ //! Chat store — thin shim over `kei_entity_store::Store`. //! -//! Layer-A convergence pilot (2026-04-23): message CRUD + FTS run -//! through `kei_entity_store::verbs::*` using `CHAT_SCHEMA`. Session -//! management (TEXT UUID PK, counter updates, status enum) stays -//! bespoke in `sessions.rs` — the `conn()` escape hatch is the only -//! surface it needs. +//! Multi-schema convergence (2026-04-23): BOTH `chat_messages` and +//! `chat_sessions` are now engine-owned. `Store::open` hands the engine +//! `ALL_SCHEMAS` so migrations for both tables run in a single +//! atomic transaction. +//! +//! Verbs dispatch per-schema: callers that act on messages pass +//! `MESSAGES_SCHEMA`, callers that act on sessions pass +//! `SESSIONS_SCHEMA`. The only bespoke SQL left is the per-message +//! session-counter UPDATE (`sessions.rs::bump_session_totals`) — the +//! engine has no "aggregate-on-related-insert" verb. -use crate::schema::CHAT_SCHEMA; +use crate::schema::ALL_SCHEMAS; use anyhow::Result; use kei_entity_store::Store as EntityStore; use rusqlite::Connection; @@ -18,12 +23,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let inner = EntityStore::open(path, &CHAT_SCHEMA)?; + let inner = EntityStore::open(path, ALL_SCHEMAS)?; Ok(Self { inner }) } pub fn open_memory() -> Result { - let inner = EntityStore::open_memory(&CHAT_SCHEMA)?; + let inner = EntityStore::open_memory(ALL_SCHEMAS)?; Ok(Self { inner }) } diff --git a/_primitives/_rust/kei-content-store/src/store.rs b/_primitives/_rust/kei-content-store/src/store.rs index 6aed140..ffec6ee 100644 --- a/_primitives/_rust/kei-content-store/src/store.rs +++ b/_primitives/_rust/kei-content-store/src/store.rs @@ -18,12 +18,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let inner = EntityStore::open(path, &CONTENT_SCHEMA)?; + let inner = EntityStore::open(path, &[&CONTENT_SCHEMA])?; Ok(Self { inner }) } pub fn open_memory() -> Result { - let inner = EntityStore::open_memory(&CONTENT_SCHEMA)?; + let inner = EntityStore::open_memory(&[&CONTENT_SCHEMA])?; Ok(Self { inner }) } diff --git a/_primitives/_rust/kei-crossdomain/src/store.rs b/_primitives/_rust/kei-crossdomain/src/store.rs index 251e552..b466d58 100644 --- a/_primitives/_rust/kei-crossdomain/src/store.rs +++ b/_primitives/_rust/kei-crossdomain/src/store.rs @@ -23,12 +23,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let inner = EntityStore::open(path, &CROSSDOMAIN_SCHEMA)?; + let inner = EntityStore::open(path, &[&CROSSDOMAIN_SCHEMA])?; Ok(Self { inner }) } pub fn open_memory() -> Result { - let inner = EntityStore::open_memory(&CROSSDOMAIN_SCHEMA)?; + let inner = EntityStore::open_memory(&[&CROSSDOMAIN_SCHEMA])?; Ok(Self { inner }) } diff --git a/_primitives/_rust/kei-entity-store/src/engine.rs b/_primitives/_rust/kei-entity-store/src/engine.rs index cd155df..fea42ca 100644 --- a/_primitives/_rust/kei-entity-store/src/engine.rs +++ b/_primitives/_rust/kei-entity-store/src/engine.rs @@ -1,10 +1,15 @@ //! Store — thin wrapper over `rusqlite::Connection` that runs the -//! schema's migration DDL on open. +//! schemas' migration DDL on open. //! //! The engine does NOT take ownership of verb dispatch. Sibling crates //! call verb modules directly (e.g. `verbs::create::run(&mut conn, //! &SCHEMA, input)`). This keeps the engine a passive provider of //! connection + schema-aware DDL. +//! +//! As of the multi-schema breaking change (2026-04-23), `Store::open` +//! accepts a SLICE of `&EntitySchema`. Every schema's DDL runs inside +//! a SINGLE transaction — if schema[i] migration fails, schema[0..i] +//! rolls back too. Verbs remain per-schema-dispatched by the caller. use crate::ddl; use crate::error::VerbError; @@ -24,22 +29,22 @@ pub struct Store { } impl Store { - /// Open (creates parent dirs, enables WAL, runs migrations for this - /// schema). - pub fn open(path: &Path, schema: &EntitySchema) -> Result { + /// Open (creates parent dirs, enables WAL, runs migrations for all + /// schemas in a single transaction). + pub fn open(path: &Path, schemas: &[&EntitySchema]) -> Result { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let conn = Connection::open(path).context("open sqlite")?; conn.pragma_update(None, "journal_mode", "WAL").ok(); - run_migrations(&conn, schema)?; + run_migrations(&conn, schemas)?; Ok(Self { conn }) } /// In-memory store — unit-test constructor. Same migrations, no FS. - pub fn open_memory(schema: &EntitySchema) -> Result { + pub fn open_memory(schemas: &[&EntitySchema]) -> Result { let conn = Connection::open_in_memory()?; - run_migrations(&conn, schema)?; + run_migrations(&conn, schemas)?; Ok(Self { conn }) } @@ -52,24 +57,39 @@ impl Store { pub fn into_conn(self) -> Connection { self.conn } } -/// Run: create primary table, indexes, FTS virtual table, edge table, -/// then any custom DDL. Idempotent (all statements use IF NOT EXISTS). -/// -/// Also stamps `PRAGMA user_version` on fresh databases so future -/// schema bumps can detect the target migration set exactly once. -pub fn run_migrations(conn: &Connection, schema: &EntitySchema) -> Result<(), VerbError> { - conn.execute_batch(&ddl::primary_table(schema))?; - conn.execute_batch(&ddl::indexes(schema))?; +/// Run all schemas' migrations atomically. For each schema: primary +/// table, indexes, FTS virtual table, edge table, custom DDL. Finally +/// stamp `user_version`. The transaction rolls back entirely if any +/// schema fails — callers never see a half-migrated DB. +pub fn run_migrations( + conn: &Connection, + schemas: &[&EntitySchema], +) -> Result<(), VerbError> { + let tx = conn.unchecked_transaction()?; + for schema in schemas { + apply_schema(&tx, schema)?; + } + apply_user_version(&tx)?; + tx.commit()?; + Ok(()) +} + +/// Apply one schema's DDL set inside an already-open transaction. +fn apply_schema( + tx: &rusqlite::Transaction<'_>, + schema: &EntitySchema, +) -> Result<(), VerbError> { + tx.execute_batch(&ddl::primary_table(schema))?; + tx.execute_batch(&ddl::indexes(schema))?; if let Some(cols) = schema.fts_columns { - conn.execute_batch(&ddl::fts_table(schema.table, cols))?; + tx.execute_batch(&ddl::fts_table(schema.table, cols))?; } if let Some(edge) = schema.edge_table { - conn.execute_batch(&ddl::edge_table_for(edge, schema.edge_key_kind))?; + tx.execute_batch(&ddl::edge_table_for(edge, schema.edge_key_kind))?; } for stmt in schema.custom_migrations { - conn.execute_batch(stmt)?; + tx.execute_batch(stmt)?; } - apply_user_version(conn)?; Ok(()) } @@ -77,12 +97,12 @@ pub fn run_migrations(conn: &Connection, schema: &EntitySchema) -> Result<(), Ve /// default to 0). If already stamped at `CURRENT_USER_VERSION` this is /// a no-op; if stamped at an older version a future bump will gate /// version-indexed DDL here. -fn apply_user_version(conn: &Connection) -> Result<(), VerbError> { - let current: u32 = conn +fn apply_user_version(tx: &rusqlite::Transaction<'_>) -> Result<(), VerbError> { + let current: u32 = tx .pragma_query_value(None, "user_version", |r| r.get(0)) .unwrap_or(0); if current < CURRENT_USER_VERSION { - conn.pragma_update(None, "user_version", CURRENT_USER_VERSION)?; + tx.pragma_update(None, "user_version", CURRENT_USER_VERSION)?; } Ok(()) } diff --git a/_primitives/_rust/kei-entity-store/tests/archive_smoke.rs b/_primitives/_rust/kei-entity-store/tests/archive_smoke.rs index 2492e91..f6953c1 100644 --- a/_primitives/_rust/kei-entity-store/tests/archive_smoke.rs +++ b/_primitives/_rust/kei-entity-store/tests/archive_smoke.rs @@ -49,7 +49,7 @@ static WITHOUT_FIELD: EntitySchema = EntitySchema { #[test] fn archive_sets_flag_and_stamps_timestamp() { - let s = Store::open_memory(&ARCHIVABLE).unwrap(); + let s = Store::open_memory(&[&ARCHIVABLE]).unwrap(); let v = create::run(s.conn(), &ARCHIVABLE, json!({ "title": "hi" })).unwrap(); let id = v["id"].as_i64().unwrap(); @@ -66,7 +66,7 @@ fn archive_sets_flag_and_stamps_timestamp() { #[test] fn archive_preserves_id_and_other_fields() { - let s = Store::open_memory(&ARCHIVABLE).unwrap(); + let s = Store::open_memory(&[&ARCHIVABLE]).unwrap(); let v = create::run(s.conn(), &ARCHIVABLE, json!({ "title": "keep" })).unwrap(); let id = v["id"].as_i64().unwrap(); archive::run(s.conn(), &ARCHIVABLE, json!({ "id": id })).unwrap(); @@ -77,7 +77,7 @@ fn archive_preserves_id_and_other_fields() { #[test] fn archive_errors_when_archived_field_missing() { - let s = Store::open_memory(&WITHOUT_FIELD).unwrap(); + let s = Store::open_memory(&[&WITHOUT_FIELD]).unwrap(); let v = create::run(s.conn(), &WITHOUT_FIELD, json!({ "title": "x" })).unwrap(); let id = v["id"].as_i64().unwrap(); let err = archive::run(s.conn(), &WITHOUT_FIELD, json!({ "id": id })).unwrap_err(); @@ -86,14 +86,14 @@ fn archive_errors_when_archived_field_missing() { #[test] fn archive_not_found_errors() { - let s = Store::open_memory(&ARCHIVABLE).unwrap(); + let s = Store::open_memory(&[&ARCHIVABLE]).unwrap(); let err = archive::run(s.conn(), &ARCHIVABLE, json!({ "id": 9999 })).unwrap_err(); assert_eq!(err.exit_code(), 2); } #[test] fn archive_rejects_missing_id() { - let s = Store::open_memory(&ARCHIVABLE).unwrap(); + let s = Store::open_memory(&[&ARCHIVABLE]).unwrap(); let err = archive::run(s.conn(), &ARCHIVABLE, json!({})).unwrap_err(); assert_eq!(err.exit_code(), 2); } diff --git a/_primitives/_rust/kei-entity-store/tests/bug_fixes_smoke.rs b/_primitives/_rust/kei-entity-store/tests/bug_fixes_smoke.rs index a348aea..f0321cc 100644 --- a/_primitives/_rust/kei-entity-store/tests/bug_fixes_smoke.rs +++ b/_primitives/_rust/kei-entity-store/tests/bug_fixes_smoke.rs @@ -31,7 +31,7 @@ static SCHEMA: EntitySchema = EntitySchema { custom_migrations: &[], }; -fn mk() -> Store { Store::open_memory(&SCHEMA).unwrap() } +fn mk() -> Store { Store::open_memory(&[&SCHEMA]).unwrap() } // ---------- C1 — silent type coercion ---------- @@ -179,10 +179,10 @@ fn m2_user_version_applied_once_idempotent() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("store.db"); { - let _s = Store::open(&path, &SCHEMA).unwrap(); + let _s = Store::open(&path, &[&SCHEMA]).unwrap(); } { - let _s = Store::open(&path, &SCHEMA).unwrap(); + let _s = Store::open(&path, &[&SCHEMA]).unwrap(); let conn = Connection::open(&path).unwrap(); let v: u32 = conn .pragma_query_value(None, "user_version", |r| r.get(0)) diff --git a/_primitives/_rust/kei-entity-store/tests/multi_schema_smoke.rs b/_primitives/_rust/kei-entity-store/tests/multi_schema_smoke.rs new file mode 100644 index 0000000..a64df5c --- /dev/null +++ b/_primitives/_rust/kei-entity-store/tests/multi_schema_smoke.rs @@ -0,0 +1,148 @@ +//! Multi-schema smoke tests — verify that `Store::open` accepts a +//! slice of `&EntitySchema`, runs every schema's migrations inside a +//! single transaction, and that verbs dispatched per-schema work +//! independently against the same underlying connection. +//! +//! Added 2026-04-23 with the multi-schema breaking change. Parity +//! target: unblock kei-chat-store from its single-schema constraint +//! (two entity types — integer-PK messages + text-PK sessions). + +use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef}; +use kei_entity_store::verbs::{create, get}; +use kei_entity_store::Store; +use serde_json::json; + +static MSG_FIELDS: &[FieldDef] = &[ + FieldDef::pk("id"), + FieldDef::text_nn("session_id"), + FieldDef::text_nn("content"), + FieldDef::created_at(), +]; + +static MSGS: EntitySchema = EntitySchema { + name: "msg", + table: "msgs", + fields: MSG_FIELDS, + enabled_verbs: &["create", "get"], + fts_columns: None, + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: None, + custom_migrations: &[], +}; + +static SESS_FIELDS: &[FieldDef] = &[ + FieldDef::text_pk("id"), + FieldDef::text_nn("project"), + FieldDef::text_archive_enum("status", "active", "archived"), + FieldDef::integer("status_at"), + FieldDef::created_at(), +]; + +static SESSIONS: EntitySchema = EntitySchema { + name: "sess", + table: "sessions", + fields: SESS_FIELDS, + enabled_verbs: &["create", "get", "archive"], + fts_columns: None, + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: Some("status"), + custom_migrations: &[], +}; + +// ---------- 1. Both schemas' tables are created on open. ---------- + +#[test] +fn two_schema_store_creates_both_tables() { + let s = Store::open_memory(&[&MSGS, &SESSIONS]).unwrap(); + // Both tables must exist; sqlite_master lookup is the cheapest + // structural check available without leaning on a verb. + let found: Vec = s + .conn() + .prepare("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('msgs','sessions') ORDER BY name") + .unwrap() + .query_map([], |r| r.get::<_, String>(0)) + .unwrap() + .collect::>() + .unwrap(); + assert_eq!(found, vec!["msgs".to_string(), "sessions".to_string()]); +} + +// ---------- 2. Verbs dispatch per-schema on the same connection. ---------- + +#[test] +fn verbs_dispatch_per_schema_across_both_tables() { + let s = Store::open_memory(&[&MSGS, &SESSIONS]).unwrap(); + + // INSERT via integer-PK schema. + let m = create::run( + s.conn(), + &MSGS, + json!({ "session_id": "abc", "content": "hi" }), + ) + .unwrap(); + let mid = m["id"].as_i64().unwrap(); + assert!(mid >= 1); + + // INSERT via text-PK schema — same connection, different schema. + let uuid = "550e8400-e29b-41d4-a716-446655440000"; + create::run( + s.conn(), + &SESSIONS, + json!({ "id": uuid, "project": "demo" }), + ) + .unwrap(); + + // GET via each schema returns only its table's row. + let m_row = get::run(s.conn(), &MSGS, json!({ "id": mid })).unwrap(); + assert_eq!(m_row["content"], "hi"); + + let s_row = get::run(s.conn(), &SESSIONS, json!({ "id": uuid })).unwrap(); + assert_eq!(s_row["project"], "demo"); + assert_eq!(s_row["status"], "active"); +} + +// ---------- 3. Migrations are transactional across schemas. ---------- + +/// Schema that deliberately breaks on migration (custom_migrations +/// references a column that doesn't exist). Used to prove that a +/// failing schema[1] rolls back schema[0]'s table too — confirming +/// atomic cross-schema migration. +static BROKEN: EntitySchema = EntitySchema { + name: "broken", + table: "broken_tbl", + fields: MSG_FIELDS, + enabled_verbs: &["create"], + fts_columns: None, + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: None, + // Intentionally invalid SQL — references `nonexistent_tbl`. + custom_migrations: &["CREATE INDEX bad_idx ON nonexistent_tbl(id);"], +}; + +#[test] +fn failing_migration_rolls_back_prior_schemas() { + // Attempt to open with [good, broken]. The broken schema's + // custom_migrations will error; the whole open must fail and + // schema[0]'s `msgs` table must NOT exist afterwards (rollback). + let path = tempfile::tempdir().unwrap(); + let db = path.path().join("atomic.db"); + let err = Store::open(&db, &[&MSGS, &BROKEN]); + assert!(err.is_err(), "expected migration failure on BROKEN schema"); + + // Re-open raw connection and verify neither table leaked through. + let raw = rusqlite::Connection::open(&db).unwrap(); + let count: i64 = raw + .query_row( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('msgs','broken_tbl')", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!( + count, 0, + "failed multi-schema migration must roll back schema[0]'s tables too" + ); +} diff --git a/_primitives/_rust/kei-entity-store/tests/real_text_pk_smoke.rs b/_primitives/_rust/kei-entity-store/tests/real_text_pk_smoke.rs index afc1a61..11cc16b 100644 --- a/_primitives/_rust/kei-entity-store/tests/real_text_pk_smoke.rs +++ b/_primitives/_rust/kei-entity-store/tests/real_text_pk_smoke.rs @@ -36,7 +36,7 @@ static SESSION_SCHEMA: EntitySchema = EntitySchema { #[test] fn text_pk_create_with_string_id_and_get_by_string_id() { - let s = Store::open_memory(&SESSION_SCHEMA).unwrap(); + let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap(); let uuid = "550e8400-e29b-41d4-a716-446655440000"; let out = create::run( s.conn(), @@ -54,7 +54,7 @@ fn text_pk_create_with_string_id_and_get_by_string_id() { #[test] fn text_pk_update_and_delete_by_string_id() { - let s = Store::open_memory(&SESSION_SCHEMA).unwrap(); + let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap(); let uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"; create::run( s.conn(), @@ -78,7 +78,7 @@ fn text_pk_update_and_delete_by_string_id() { #[test] fn text_pk_create_rejects_missing_id() { - let s = Store::open_memory(&SESSION_SCHEMA).unwrap(); + let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap(); let err = create::run(s.conn(), &SESSION_SCHEMA, json!({ "title": "x" })).unwrap_err(); assert_eq!(err.exit_code(), 2); } @@ -107,7 +107,7 @@ static COST_SCHEMA: EntitySchema = EntitySchema { #[test] fn real_column_round_trips_f64_unchanged() { - let s = Store::open_memory(&COST_SCHEMA).unwrap(); + let s = Store::open_memory(&[&COST_SCHEMA]).unwrap(); let v = create::run( s.conn(), &COST_SCHEMA, @@ -121,7 +121,7 @@ fn real_column_round_trips_f64_unchanged() { #[test] fn real_default_applies_when_missing() { - let s = Store::open_memory(&COST_SCHEMA).unwrap(); + let s = Store::open_memory(&[&COST_SCHEMA]).unwrap(); let v = create::run( s.conn(), &COST_SCHEMA, @@ -157,7 +157,7 @@ static CHAT_SCHEMA: EntitySchema = EntitySchema { #[test] fn archive_textenum_writes_archived_sentinel_string() { - let s = Store::open_memory(&CHAT_SCHEMA).unwrap(); + let s = Store::open_memory(&[&CHAT_SCHEMA]).unwrap(); let uuid = "aaaa-bbbb-cccc"; create::run( s.conn(), @@ -209,7 +209,7 @@ static META_EDGE_SCHEMA: EntitySchema = EntitySchema { #[test] fn text_pair_metadata_link_stores_weight_and_timestamp() { - let s = Store::open_memory(&META_EDGE_SCHEMA).unwrap(); + let s = Store::open_memory(&[&META_EDGE_SCHEMA]).unwrap(); link::run( s.conn(), &META_EDGE_SCHEMA, @@ -234,7 +234,7 @@ fn text_pair_metadata_rank_respects_weight() { // Graph: a → b (weight 10), a → c (weight 1). b and c both sink. // Weighted PageRank should push `b` above `c`; unweighted would // split flow 50/50 (→ tie or identical rank). - let s = Store::open_memory(&META_EDGE_SCHEMA).unwrap(); + let s = Store::open_memory(&[&META_EDGE_SCHEMA]).unwrap(); link::run( s.conn(), &META_EDGE_SCHEMA, diff --git a/_primitives/_rust/kei-entity-store/tests/text_pair_smoke.rs b/_primitives/_rust/kei-entity-store/tests/text_pair_smoke.rs index 54851e8..2e0c9f1 100644 --- a/_primitives/_rust/kei-entity-store/tests/text_pair_smoke.rs +++ b/_primitives/_rust/kei-entity-store/tests/text_pair_smoke.rs @@ -41,7 +41,7 @@ static INTEGER_SCHEMA: EntitySchema = EntitySchema { #[test] fn text_pair_link_and_lookup() { - let s = Store::open_memory(&TEXT_SCHEMA).unwrap(); + let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap(); link::run( s.conn(), &TEXT_SCHEMA, @@ -61,7 +61,7 @@ fn text_pair_link_and_lookup() { #[test] fn text_pair_link_idempotent() { - let s = Store::open_memory(&TEXT_SCHEMA).unwrap(); + let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap(); for _ in 0..3 { link::run( s.conn(), @@ -79,7 +79,7 @@ fn text_pair_link_idempotent() { #[test] fn text_pair_rank_returns_string_ids() { - let s = Store::open_memory(&TEXT_SCHEMA).unwrap(); + let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap(); let pairs = [("a.md", "b.md"), ("a.md", "c.md"), ("b.md", "c.md")]; for (from, to) in pairs { link::run(s.conn(), &TEXT_SCHEMA, json!({ "from": from, "to": to })).unwrap(); @@ -94,7 +94,7 @@ fn text_pair_rank_returns_string_ids() { #[test] fn text_pair_rejects_integer_input() { - let s = Store::open_memory(&TEXT_SCHEMA).unwrap(); + let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap(); let err = link::run(s.conn(), &TEXT_SCHEMA, json!({ "from": 1, "to": 2 })) .unwrap_err(); assert_eq!(err.exit_code(), 2); @@ -126,7 +126,7 @@ static META_EXTRAS_SCHEMA: EntitySchema = EntitySchema { #[test] fn text_pair_with_extras_roundtrip() { - let s = Store::open_memory(&META_EXTRAS_SCHEMA).unwrap(); + let s = Store::open_memory(&[&META_EXTRAS_SCHEMA]).unwrap(); link::run( s.conn(), &META_EXTRAS_SCHEMA, @@ -156,7 +156,7 @@ fn text_pair_with_extras_roundtrip() { #[test] fn text_pair_with_custom_col_names_rank_uses_from_to_cols() { - let s = Store::open_memory(&META_EXTRAS_SCHEMA).unwrap(); + let s = Store::open_memory(&[&META_EXTRAS_SCHEMA]).unwrap(); for (f, t) in [("a://x", "b://y"), ("a://x", "c://z"), ("b://y", "c://z")] { link::run( s.conn(), @@ -175,7 +175,7 @@ fn text_pair_with_custom_col_names_rank_uses_from_to_cols() { #[test] fn integer_pair_still_works_after_refactor() { // Regression guard — kei-task uses IntegerPair implicitly. - let s = Store::open_memory(&INTEGER_SCHEMA).unwrap(); + let s = Store::open_memory(&[&INTEGER_SCHEMA]).unwrap(); link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 1, "to": 2 })).unwrap(); link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 1, "to": 3 })).unwrap(); link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 2, "to": 3 })).unwrap(); diff --git a/_primitives/_rust/kei-entity-store/tests/verb_smoke.rs b/_primitives/_rust/kei-entity-store/tests/verb_smoke.rs index 91471fc..42975b1 100644 --- a/_primitives/_rust/kei-entity-store/tests/verb_smoke.rs +++ b/_primitives/_rust/kei-entity-store/tests/verb_smoke.rs @@ -28,7 +28,7 @@ static SCHEMA: EntitySchema = EntitySchema { custom_migrations: &[], }; -fn mk() -> Store { Store::open_memory(&SCHEMA).unwrap() } +fn mk() -> Store { Store::open_memory(&[&SCHEMA]).unwrap() } fn create_one(s: &Store, title: &str) -> i64 { let v = create::run(s.conn(), &SCHEMA, json!({ "title": title })).unwrap(); @@ -155,7 +155,7 @@ fn disabled_verb_errors() { archived_field: None, custom_migrations: &[], }; - let s = Store::open_memory(&DISABLED).unwrap(); + let s = Store::open_memory(&[&DISABLED]).unwrap(); let err = create::run(s.conn(), &DISABLED, json!({ "title": "x" })).unwrap_err(); assert_eq!(err.exit_code(), 2); } diff --git a/_primitives/_rust/kei-sage/src/schema.rs b/_primitives/_rust/kei-sage/src/schema.rs index 0da4bce..093f75c 100644 --- a/_primitives/_rust/kei-sage/src/schema.rs +++ b/_primitives/_rust/kei-sage/src/schema.rs @@ -102,7 +102,7 @@ pub static SAGE_SCHEMA: EntitySchema = EntitySchema { /// `SAGE_SCHEMA`. Preserved as a named entry point so downstream /// callers and tests can still spell out the migration explicitly. pub fn create_schema(conn: &Connection) -> Result<()> { - kei_entity_store::engine::run_migrations(conn, &SAGE_SCHEMA) + kei_entity_store::engine::run_migrations(conn, &[&SAGE_SCHEMA]) .map_err(|e| match e { kei_entity_store::VerbError::Sqlite(sq) => sq, other => rusqlite::Error::ToSqlConversionFailure(Box::new( diff --git a/_primitives/_rust/kei-sage/src/store.rs b/_primitives/_rust/kei-sage/src/store.rs index 6897227..558f4e6 100644 --- a/_primitives/_rust/kei-sage/src/store.rs +++ b/_primitives/_rust/kei-sage/src/store.rs @@ -22,12 +22,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let engine = EngineStore::open(path, &SAGE_SCHEMA).context("engine store open")?; + let engine = EngineStore::open(path, &[&SAGE_SCHEMA]).context("engine store open")?; Ok(Self { engine }) } pub fn open_memory() -> Result { - let engine = EngineStore::open_memory(&SAGE_SCHEMA).context("engine store open_memory")?; + let engine = EngineStore::open_memory(&[&SAGE_SCHEMA]).context("engine store open_memory")?; Ok(Self { engine }) } diff --git a/_primitives/_rust/kei-social-store/src/store.rs b/_primitives/_rust/kei-social-store/src/store.rs index 54940d3..7b8a553 100644 --- a/_primitives/_rust/kei-social-store/src/store.rs +++ b/_primitives/_rust/kei-social-store/src/store.rs @@ -18,12 +18,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let inner = EntityStore::open(path, &SOCIAL_SCHEMA)?; + let inner = EntityStore::open(path, &[&SOCIAL_SCHEMA])?; Ok(Self { inner }) } pub fn open_memory() -> Result { - let inner = EntityStore::open_memory(&SOCIAL_SCHEMA)?; + let inner = EntityStore::open_memory(&[&SOCIAL_SCHEMA])?; Ok(Self { inner }) } diff --git a/_primitives/_rust/kei-task/src/store.rs b/_primitives/_rust/kei-task/src/store.rs index 6154a35..e74f322 100644 --- a/_primitives/_rust/kei-task/src/store.rs +++ b/_primitives/_rust/kei-task/src/store.rs @@ -21,12 +21,12 @@ pub struct Store { impl Store { pub fn open(path: &Path) -> Result { - let inner = EntityStore::open(path, &TASK_SCHEMA)?; + let inner = EntityStore::open(path, &[&TASK_SCHEMA])?; Ok(Self { inner }) } pub fn open_memory() -> Result { - let inner = EntityStore::open_memory(&TASK_SCHEMA)?; + let inner = EntityStore::open_memory(&[&TASK_SCHEMA])?; Ok(Self { inner }) }