Merge A — Store::open multi-schema; kei-chat-store sessions engine-owned

This commit is contained in:
Parfii-bot 2026-04-23 14:27:15 +08:00
commit 64888d5748
17 changed files with 374 additions and 158 deletions

View file

@ -1,37 +1,27 @@
//! kei-chat-store EntitySchema — declarative spec consumed by
//! kei-chat-store EntitySchemas — declarative specs consumed by
//! `kei_entity_store::Store` and its verb templates.
//!
//! Shape (Layer-A convergence, 2026-04-23; cost-column re-migration
//! 2026-04-23 wave 8):
//! Shape (multi-schema convergence, 2026-04-23):
//!
//! - Primary entity = `chat_messages` (INTEGER PK; required by engine).
//! Engine owns: create / get / list / search verbs + FTS reindex.
//! - Bespoke: `chat_sessions` has a TEXT UUID primary key. The engine
//! gained `FieldKind::TextPk` in wave 8 but Store::open currently
//! takes a SINGLE EntitySchema, so a second managed schema for
//! sessions would require engine multi-schema support. Until that
//! lands, the session DDL rides `custom_migrations` and its CRUD
//! stays in `sessions.rs` (analogous to kei-task's milestones / deps
//! / graph). **Known open:** promote chat_sessions to a second
//! EntitySchema (`TextPk` + `TextArchiveEnum`) once engine gains
//! multi-schema support.
//! - Archive: sessions use a TEXT `status` enum ('active' | 'archived').
//! Session archival stays bespoke (same reason as above).
//! - Per-message `cost` (REAL) is restored via
//! `FieldKind::RealDefault(0.0)` — wave-8 addition. Engine-managed
//! INSERT/SELECT, no bespoke SQL needed. Previously dropped when
//! the engine had no REAL kind; re-instated 2026-04-23.
//! - `MESSAGES_SCHEMA`: primary entity `chat_messages` (INTEGER PK;
//! engine-owned create/get/list/search + FTS reindex).
//! - `SESSIONS_SCHEMA`: second entity `chat_sessions` (TEXT UUID PK +
//! `TextArchiveEnum` status column, engine-owned create/get/archive).
//! Previously rode `custom_migrations`; now a first-class schema
//! since `Store::open` accepts a slice of schemas.
//! - `ALL_SCHEMAS`: the `&[&EntitySchema]` slice the `Store` wrapper
//! hands to the engine on open.
//!
//! FTS column-name change vs pre-convergence shape:
//! legacy fts_chat(message_id, session_id UNINDEXED, content)
//! engine fts_chat_messages(chat_messages_id UNINDEXED, content)
//! The `session_id` shadow column was UNINDEXED (never matched on) so
//! no search path regresses. Fresh databases only — on-disk DBs from
//! the pre-engine era need recreation.
//! The session aggregates (`message_count`, `total_tokens`, `total_cost`)
//! are still updated via bespoke SQL in `sessions.rs` because the
//! engine has no `increment-on-related-insert` verb. That bespoke path
//! shrank from "whole row lifecycle" to "UPDATE counters only".
use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef};
static FIELDS: &[FieldDef] = &[
// ---- chat_messages ---------------------------------------------------
static MESSAGE_FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("session_id"),
FieldDef::text_nn("role"),
@ -42,33 +32,58 @@ static FIELDS: &[FieldDef] = &[
FieldDef::created_at(),
];
const DDL_SECONDARY: &str = r#"
CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id);
/// Keep the idx_cm_session index around — generic schema has no
/// `indexed` flag for one-off single-column indexes on non-PK fields.
const MESSAGES_INDEX_DDL: &str =
"CREATE INDEX IF NOT EXISTS idx_cm_session ON chat_messages(session_id);";
CREATE TABLE IF NOT EXISTS chat_sessions (
id TEXT PRIMARY KEY,
project TEXT NOT NULL,
title TEXT DEFAULT '',
model TEXT DEFAULT '',
status TEXT DEFAULT 'active',
message_count INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
total_cost REAL DEFAULT 0.0,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project);
CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status);
"#;
pub static CHAT_SCHEMA: EntitySchema = EntitySchema {
pub static MESSAGES_SCHEMA: EntitySchema = EntitySchema {
name: "chat_message",
table: "chat_messages",
fields: FIELDS,
fields: MESSAGE_FIELDS,
enabled_verbs: &["create", "get", "list", "search"],
fts_columns: Some(&["content"]),
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: None,
custom_migrations: &[DDL_SECONDARY],
custom_migrations: &[MESSAGES_INDEX_DDL],
};
// ---- chat_sessions ---------------------------------------------------
static SESSION_FIELDS: &[FieldDef] = &[
FieldDef::text_pk("id"),
FieldDef::text_nn("project"),
FieldDef::text_default("title", ""),
FieldDef::text_default("model", ""),
FieldDef::text_archive_enum("status", "active", "archived"),
FieldDef::integer("status_at"),
FieldDef::integer("message_count"),
FieldDef::integer("total_tokens"),
FieldDef::real_default("total_cost", 0.0),
FieldDef::created_at(),
FieldDef::updated_at(),
];
/// Legacy indexes on chat_sessions (project, status). `indexed` flag on
/// FieldDef only covers single-column indexes with a deterministic
/// `idx_<table>_<col>` name — matches what we need here.
const SESSIONS_INDEX_DDL: &str = "\
CREATE INDEX IF NOT EXISTS idx_cs_project ON chat_sessions(project);\n\
CREATE INDEX IF NOT EXISTS idx_cs_status ON chat_sessions(status);";
pub static SESSIONS_SCHEMA: EntitySchema = EntitySchema {
name: "chat_session",
table: "chat_sessions",
fields: SESSION_FIELDS,
enabled_verbs: &["create", "get", "archive", "update"],
fts_columns: None,
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: Some("status"),
custom_migrations: &[SESSIONS_INDEX_DDL],
};
// ---- aggregate slice for Store::open -------------------------------
pub static ALL_SCHEMAS: &[&EntitySchema] = &[&MESSAGES_SCHEMA, &SESSIONS_SCHEMA];

View file

@ -1,13 +1,13 @@
//! FTS over messages.
//!
//! Layer-A convergence (2026-04-23): delegates to
//! `kei_entity_store::verbs::search` using `CHAT_SCHEMA`. The engine
//! `kei_entity_store::verbs::search` using `MESSAGES_SCHEMA`. The engine
//! handles FTS5 JOIN + rank ordering; this module maps the generic
//! JSON result back to typed `ChatMessage` rows for legacy callers.
//! Per-message `cost` is persisted (engine `RealDefault` field);
//! `row_to_message` reads it back as f64.
use crate::schema::CHAT_SCHEMA;
use crate::schema::MESSAGES_SCHEMA;
use crate::sessions::ChatMessage;
use crate::store::Store;
use anyhow::{anyhow, Result};
@ -16,7 +16,7 @@ use serde_json::{json, Value};
pub fn search(store: &Store, query: &str, limit: i64) -> Result<Vec<ChatMessage>> {
let lim = if limit <= 0 { 20 } else { limit };
let v = v_search::run(store.conn(), &CHAT_SCHEMA, json!({ "query": query, "limit": lim }))
let v = v_search::run(store.conn(), &MESSAGES_SCHEMA, json!({ "query": query, "limit": lim }))
.map_err(|e| anyhow!("{e}"))?;
let arr = v["results"]
.as_array()

View file

@ -1,20 +1,23 @@
//! Session + message operations.
//!
//! Layer-A convergence (2026-04-23): message INSERT + FTS reindex
//! delegate to `kei_entity_store::verbs::create` via `CHAT_SCHEMA`.
//! Session rows still use bespoke SQL (TEXT UUID PK + TEXT status enum
//! are outside the engine's INTEGER-PK / INTEGER-archived-field model).
//! `save_message` still owns the bespoke session-counter UPDATE after
//! each message insert — same semantics as pre-convergence.
//! Multi-schema convergence (2026-04-23): BOTH sessions and messages
//! now flow through `kei_entity_store::verbs::*`. `start_session` uses
//! `create` against `SESSIONS_SCHEMA` (TextPk + TextArchiveEnum);
//! `archive_session` uses `archive`; `get_session` uses `get`;
//! `save_message` uses `create` against `MESSAGES_SCHEMA`.
//!
//! Only the per-message aggregate update on `chat_sessions`
//! (message_count / total_tokens / total_cost) stays bespoke — the
//! engine has no "update-on-related-insert" verb.
use crate::schema::CHAT_SCHEMA;
use crate::schema::{MESSAGES_SCHEMA, SESSIONS_SCHEMA};
use crate::store::Store;
use anyhow::{anyhow, Result};
use chrono::Utc;
use kei_entity_store::verbs::create as v_create;
use kei_entity_store::verbs::{archive as v_archive, create as v_create, get as v_get};
use rusqlite::params;
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::{json, Value};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ChatSession {
@ -44,12 +47,12 @@ pub struct ChatMessage {
pub fn start_session(store: &Store, project: &str, title: &str, model: &str) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().timestamp();
store.conn().execute(
"INSERT INTO chat_sessions (id, project, title, model, status, created_at, updated_at)
VALUES (?1,?2,?3,?4,'active',?5,?5)",
params![id, project, title, model, now],
)?;
v_create::run(
store.conn(),
&SESSIONS_SCHEMA,
json!({ "id": id, "project": project, "title": title, "model": model }),
)
.map_err(|e| anyhow!("{e}"))?;
Ok(id)
}
@ -64,43 +67,68 @@ pub fn save_message(store: &Store, msg: &ChatMessage) -> Result<i64> {
"cost": msg.cost,
"created_at": msg.created_at,
});
let v = v_create::run(store.conn(), &CHAT_SCHEMA, payload)
let v = v_create::run(store.conn(), &MESSAGES_SCHEMA, payload)
.map_err(|e| anyhow!("{e}"))?;
let id = v["id"]
.as_i64()
.ok_or_else(|| anyhow!("missing id in create response"))?;
store.conn().execute(
"UPDATE chat_sessions SET message_count = message_count + 1,
total_tokens = total_tokens + ?1, total_cost = total_cost + ?2,
updated_at = ?3 WHERE id = ?4",
params![msg.tokens_in + msg.tokens_out, msg.cost, now, msg.session_id],
)?;
bump_session_totals(store, &msg.session_id, msg.tokens_in + msg.tokens_out, msg.cost, now)?;
Ok(id)
}
pub fn archive_session(store: &Store, session_id: &str) -> Result<()> {
let n = store.conn().execute(
"UPDATE chat_sessions SET status='archived', updated_at=?1 WHERE id=?2",
params![Utc::now().timestamp(), session_id],
/// Bespoke aggregate update — engine has no "increment-on-related-insert"
/// verb. Keeps the per-session counters in sync with what was just
/// inserted into chat_messages.
fn bump_session_totals(
store: &Store,
session_id: &str,
tokens_delta: i64,
cost_delta: f64,
now: i64,
) -> Result<()> {
store.conn().execute(
"UPDATE chat_sessions
SET message_count = message_count + 1,
total_tokens = total_tokens + ?1,
total_cost = total_cost + ?2,
updated_at = ?3
WHERE id = ?4",
params![tokens_delta, cost_delta, now, session_id],
)?;
if n == 0 {
return Err(anyhow!("session {session_id} not found"));
}
Ok(())
}
pub fn archive_session(store: &Store, session_id: &str) -> Result<()> {
v_archive::run(store.conn(), &SESSIONS_SCHEMA, json!({ "id": session_id }))
.map_err(|e| anyhow!("{e}"))?;
Ok(())
}
pub fn get_session(store: &Store, id: &str) -> Result<Option<ChatSession>> {
let mut stmt = store.conn().prepare(
"SELECT id, project, title, model, status, message_count, total_tokens,
total_cost, created_at, updated_at FROM chat_sessions WHERE id=?1",
)?;
let mut rows = stmt.query(params![id])?;
if let Some(r) = rows.next()? {
return Ok(Some(ChatSession {
id: r.get(0)?, project: r.get(1)?, title: r.get(2)?, model: r.get(3)?,
status: r.get(4)?, message_count: r.get(5)?, total_tokens: r.get(6)?,
total_cost: r.get(7)?, created_at: r.get(8)?, updated_at: r.get(9)?,
}));
match v_get::run(store.conn(), &SESSIONS_SCHEMA, json!({ "id": id })) {
Ok(v) => Ok(Some(session_from_json(v)?)),
Err(e) if e.exit_code() == 2 => Ok(None),
Err(e) => Err(anyhow!("{e}")),
}
Ok(None)
}
fn session_from_json(v: Value) -> Result<ChatSession> {
let obj = v
.as_object()
.ok_or_else(|| anyhow!("expected object in get response"))?;
let s = |k: &str| obj.get(k).and_then(|x| x.as_str()).unwrap_or("").to_string();
let i = |k: &str| obj.get(k).and_then(|x| x.as_i64()).unwrap_or(0);
let f = |k: &str| obj.get(k).and_then(|x| x.as_f64()).unwrap_or(0.0);
Ok(ChatSession {
id: s("id"),
project: s("project"),
title: s("title"),
model: s("model"),
status: s("status"),
message_count: i("message_count"),
total_tokens: i("total_tokens"),
total_cost: f("total_cost"),
created_at: i("created_at"),
updated_at: i("updated_at"),
})
}

View file

@ -1,12 +1,17 @@
//! Chat store — thin shim over `kei_entity_store::Store`.
//!
//! Layer-A convergence pilot (2026-04-23): message CRUD + FTS run
//! through `kei_entity_store::verbs::*` using `CHAT_SCHEMA`. Session
//! management (TEXT UUID PK, counter updates, status enum) stays
//! bespoke in `sessions.rs` — the `conn()` escape hatch is the only
//! surface it needs.
//! Multi-schema convergence (2026-04-23): BOTH `chat_messages` and
//! `chat_sessions` are now engine-owned. `Store::open` hands the engine
//! `ALL_SCHEMAS` so migrations for both tables run in a single
//! atomic transaction.
//!
//! Verbs dispatch per-schema: callers that act on messages pass
//! `MESSAGES_SCHEMA`, callers that act on sessions pass
//! `SESSIONS_SCHEMA`. The only bespoke SQL left is the per-message
//! session-counter UPDATE (`sessions.rs::bump_session_totals`) — the
//! engine has no "aggregate-on-related-insert" verb.
use crate::schema::CHAT_SCHEMA;
use crate::schema::ALL_SCHEMAS;
use anyhow::Result;
use kei_entity_store::Store as EntityStore;
use rusqlite::Connection;
@ -18,12 +23,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, &CHAT_SCHEMA)?;
let inner = EntityStore::open(path, ALL_SCHEMAS)?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(&CHAT_SCHEMA)?;
let inner = EntityStore::open_memory(ALL_SCHEMAS)?;
Ok(Self { inner })
}

View file

@ -18,12 +18,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, &CONTENT_SCHEMA)?;
let inner = EntityStore::open(path, &[&CONTENT_SCHEMA])?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(&CONTENT_SCHEMA)?;
let inner = EntityStore::open_memory(&[&CONTENT_SCHEMA])?;
Ok(Self { inner })
}

View file

@ -23,12 +23,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, &CROSSDOMAIN_SCHEMA)?;
let inner = EntityStore::open(path, &[&CROSSDOMAIN_SCHEMA])?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(&CROSSDOMAIN_SCHEMA)?;
let inner = EntityStore::open_memory(&[&CROSSDOMAIN_SCHEMA])?;
Ok(Self { inner })
}

View file

@ -1,10 +1,15 @@
//! Store — thin wrapper over `rusqlite::Connection` that runs the
//! schema's migration DDL on open.
//! schemas' migration DDL on open.
//!
//! The engine does NOT take ownership of verb dispatch. Sibling crates
//! call verb modules directly (e.g. `verbs::create::run(&mut conn,
//! &SCHEMA, input)`). This keeps the engine a passive provider of
//! connection + schema-aware DDL.
//!
//! As of the multi-schema breaking change (2026-04-23), `Store::open`
//! accepts a SLICE of `&EntitySchema`. Every schema's DDL runs inside
//! a SINGLE transaction — if schema[i] migration fails, schema[0..i]
//! rolls back too. Verbs remain per-schema-dispatched by the caller.
use crate::ddl;
use crate::error::VerbError;
@ -24,22 +29,22 @@ pub struct Store {
}
impl Store {
/// Open (creates parent dirs, enables WAL, runs migrations for this
/// schema).
pub fn open(path: &Path, schema: &EntitySchema) -> Result<Self> {
/// Open (creates parent dirs, enables WAL, runs migrations for all
/// schemas in a single transaction).
pub fn open(path: &Path, schemas: &[&EntitySchema]) -> Result<Self> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path).context("open sqlite")?;
conn.pragma_update(None, "journal_mode", "WAL").ok();
run_migrations(&conn, schema)?;
run_migrations(&conn, schemas)?;
Ok(Self { conn })
}
/// In-memory store — unit-test constructor. Same migrations, no FS.
pub fn open_memory(schema: &EntitySchema) -> Result<Self> {
pub fn open_memory(schemas: &[&EntitySchema]) -> Result<Self> {
let conn = Connection::open_in_memory()?;
run_migrations(&conn, schema)?;
run_migrations(&conn, schemas)?;
Ok(Self { conn })
}
@ -52,24 +57,39 @@ impl Store {
pub fn into_conn(self) -> Connection { self.conn }
}
/// Run: create primary table, indexes, FTS virtual table, edge table,
/// then any custom DDL. Idempotent (all statements use IF NOT EXISTS).
///
/// Also stamps `PRAGMA user_version` on fresh databases so future
/// schema bumps can detect the target migration set exactly once.
pub fn run_migrations(conn: &Connection, schema: &EntitySchema) -> Result<(), VerbError> {
conn.execute_batch(&ddl::primary_table(schema))?;
conn.execute_batch(&ddl::indexes(schema))?;
/// Run all schemas' migrations atomically. For each schema: primary
/// table, indexes, FTS virtual table, edge table, custom DDL. Finally
/// stamp `user_version`. The transaction rolls back entirely if any
/// schema fails — callers never see a half-migrated DB.
pub fn run_migrations(
conn: &Connection,
schemas: &[&EntitySchema],
) -> Result<(), VerbError> {
let tx = conn.unchecked_transaction()?;
for schema in schemas {
apply_schema(&tx, schema)?;
}
apply_user_version(&tx)?;
tx.commit()?;
Ok(())
}
/// Apply one schema's DDL set inside an already-open transaction.
fn apply_schema(
tx: &rusqlite::Transaction<'_>,
schema: &EntitySchema,
) -> Result<(), VerbError> {
tx.execute_batch(&ddl::primary_table(schema))?;
tx.execute_batch(&ddl::indexes(schema))?;
if let Some(cols) = schema.fts_columns {
conn.execute_batch(&ddl::fts_table(schema.table, cols))?;
tx.execute_batch(&ddl::fts_table(schema.table, cols))?;
}
if let Some(edge) = schema.edge_table {
conn.execute_batch(&ddl::edge_table_for(edge, schema.edge_key_kind))?;
tx.execute_batch(&ddl::edge_table_for(edge, schema.edge_key_kind))?;
}
for stmt in schema.custom_migrations {
conn.execute_batch(stmt)?;
tx.execute_batch(stmt)?;
}
apply_user_version(conn)?;
Ok(())
}
@ -77,12 +97,12 @@ pub fn run_migrations(conn: &Connection, schema: &EntitySchema) -> Result<(), Ve
/// default to 0). If already stamped at `CURRENT_USER_VERSION` this is
/// a no-op; if stamped at an older version a future bump will gate
/// version-indexed DDL here.
fn apply_user_version(conn: &Connection) -> Result<(), VerbError> {
let current: u32 = conn
fn apply_user_version(tx: &rusqlite::Transaction<'_>) -> Result<(), VerbError> {
let current: u32 = tx
.pragma_query_value(None, "user_version", |r| r.get(0))
.unwrap_or(0);
if current < CURRENT_USER_VERSION {
conn.pragma_update(None, "user_version", CURRENT_USER_VERSION)?;
tx.pragma_update(None, "user_version", CURRENT_USER_VERSION)?;
}
Ok(())
}

View file

@ -49,7 +49,7 @@ static WITHOUT_FIELD: EntitySchema = EntitySchema {
#[test]
fn archive_sets_flag_and_stamps_timestamp() {
let s = Store::open_memory(&ARCHIVABLE).unwrap();
let s = Store::open_memory(&[&ARCHIVABLE]).unwrap();
let v = create::run(s.conn(), &ARCHIVABLE, json!({ "title": "hi" })).unwrap();
let id = v["id"].as_i64().unwrap();
@ -66,7 +66,7 @@ fn archive_sets_flag_and_stamps_timestamp() {
#[test]
fn archive_preserves_id_and_other_fields() {
let s = Store::open_memory(&ARCHIVABLE).unwrap();
let s = Store::open_memory(&[&ARCHIVABLE]).unwrap();
let v = create::run(s.conn(), &ARCHIVABLE, json!({ "title": "keep" })).unwrap();
let id = v["id"].as_i64().unwrap();
archive::run(s.conn(), &ARCHIVABLE, json!({ "id": id })).unwrap();
@ -77,7 +77,7 @@ fn archive_preserves_id_and_other_fields() {
#[test]
fn archive_errors_when_archived_field_missing() {
let s = Store::open_memory(&WITHOUT_FIELD).unwrap();
let s = Store::open_memory(&[&WITHOUT_FIELD]).unwrap();
let v = create::run(s.conn(), &WITHOUT_FIELD, json!({ "title": "x" })).unwrap();
let id = v["id"].as_i64().unwrap();
let err = archive::run(s.conn(), &WITHOUT_FIELD, json!({ "id": id })).unwrap_err();
@ -86,14 +86,14 @@ fn archive_errors_when_archived_field_missing() {
#[test]
fn archive_not_found_errors() {
let s = Store::open_memory(&ARCHIVABLE).unwrap();
let s = Store::open_memory(&[&ARCHIVABLE]).unwrap();
let err = archive::run(s.conn(), &ARCHIVABLE, json!({ "id": 9999 })).unwrap_err();
assert_eq!(err.exit_code(), 2);
}
#[test]
fn archive_rejects_missing_id() {
let s = Store::open_memory(&ARCHIVABLE).unwrap();
let s = Store::open_memory(&[&ARCHIVABLE]).unwrap();
let err = archive::run(s.conn(), &ARCHIVABLE, json!({})).unwrap_err();
assert_eq!(err.exit_code(), 2);
}

View file

@ -31,7 +31,7 @@ static SCHEMA: EntitySchema = EntitySchema {
custom_migrations: &[],
};
fn mk() -> Store { Store::open_memory(&SCHEMA).unwrap() }
fn mk() -> Store { Store::open_memory(&[&SCHEMA]).unwrap() }
// ---------- C1 — silent type coercion ----------
@ -179,10 +179,10 @@ fn m2_user_version_applied_once_idempotent() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("store.db");
{
let _s = Store::open(&path, &SCHEMA).unwrap();
let _s = Store::open(&path, &[&SCHEMA]).unwrap();
}
{
let _s = Store::open(&path, &SCHEMA).unwrap();
let _s = Store::open(&path, &[&SCHEMA]).unwrap();
let conn = Connection::open(&path).unwrap();
let v: u32 = conn
.pragma_query_value(None, "user_version", |r| r.get(0))

View file

@ -0,0 +1,148 @@
//! Multi-schema smoke tests — verify that `Store::open` accepts a
//! slice of `&EntitySchema`, runs every schema's migrations inside a
//! single transaction, and that verbs dispatched per-schema work
//! independently against the same underlying connection.
//!
//! Added 2026-04-23 with the multi-schema breaking change. Parity
//! target: unblock kei-chat-store from its single-schema constraint
//! (two entity types — integer-PK messages + text-PK sessions).
use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef};
use kei_entity_store::verbs::{create, get};
use kei_entity_store::Store;
use serde_json::json;
static MSG_FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("session_id"),
FieldDef::text_nn("content"),
FieldDef::created_at(),
];
static MSGS: EntitySchema = EntitySchema {
name: "msg",
table: "msgs",
fields: MSG_FIELDS,
enabled_verbs: &["create", "get"],
fts_columns: None,
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: None,
custom_migrations: &[],
};
static SESS_FIELDS: &[FieldDef] = &[
FieldDef::text_pk("id"),
FieldDef::text_nn("project"),
FieldDef::text_archive_enum("status", "active", "archived"),
FieldDef::integer("status_at"),
FieldDef::created_at(),
];
static SESSIONS: EntitySchema = EntitySchema {
name: "sess",
table: "sessions",
fields: SESS_FIELDS,
enabled_verbs: &["create", "get", "archive"],
fts_columns: None,
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: Some("status"),
custom_migrations: &[],
};
// ---------- 1. Both schemas' tables are created on open. ----------
#[test]
fn two_schema_store_creates_both_tables() {
let s = Store::open_memory(&[&MSGS, &SESSIONS]).unwrap();
// Both tables must exist; sqlite_master lookup is the cheapest
// structural check available without leaning on a verb.
let found: Vec<String> = s
.conn()
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('msgs','sessions') ORDER BY name")
.unwrap()
.query_map([], |r| r.get::<_, String>(0))
.unwrap()
.collect::<Result<_, _>>()
.unwrap();
assert_eq!(found, vec!["msgs".to_string(), "sessions".to_string()]);
}
// ---------- 2. Verbs dispatch per-schema on the same connection. ----------
#[test]
fn verbs_dispatch_per_schema_across_both_tables() {
let s = Store::open_memory(&[&MSGS, &SESSIONS]).unwrap();
// INSERT via integer-PK schema.
let m = create::run(
s.conn(),
&MSGS,
json!({ "session_id": "abc", "content": "hi" }),
)
.unwrap();
let mid = m["id"].as_i64().unwrap();
assert!(mid >= 1);
// INSERT via text-PK schema — same connection, different schema.
let uuid = "550e8400-e29b-41d4-a716-446655440000";
create::run(
s.conn(),
&SESSIONS,
json!({ "id": uuid, "project": "demo" }),
)
.unwrap();
// GET via each schema returns only its table's row.
let m_row = get::run(s.conn(), &MSGS, json!({ "id": mid })).unwrap();
assert_eq!(m_row["content"], "hi");
let s_row = get::run(s.conn(), &SESSIONS, json!({ "id": uuid })).unwrap();
assert_eq!(s_row["project"], "demo");
assert_eq!(s_row["status"], "active");
}
// ---------- 3. Migrations are transactional across schemas. ----------
/// Schema that deliberately breaks on migration (custom_migrations
/// references a column that doesn't exist). Used to prove that a
/// failing schema[1] rolls back schema[0]'s table too — confirming
/// atomic cross-schema migration.
static BROKEN: EntitySchema = EntitySchema {
name: "broken",
table: "broken_tbl",
fields: MSG_FIELDS,
enabled_verbs: &["create"],
fts_columns: None,
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: None,
// Intentionally invalid SQL — references `nonexistent_tbl`.
custom_migrations: &["CREATE INDEX bad_idx ON nonexistent_tbl(id);"],
};
#[test]
fn failing_migration_rolls_back_prior_schemas() {
// Attempt to open with [good, broken]. The broken schema's
// custom_migrations will error; the whole open must fail and
// schema[0]'s `msgs` table must NOT exist afterwards (rollback).
let path = tempfile::tempdir().unwrap();
let db = path.path().join("atomic.db");
let err = Store::open(&db, &[&MSGS, &BROKEN]);
assert!(err.is_err(), "expected migration failure on BROKEN schema");
// Re-open raw connection and verify neither table leaked through.
let raw = rusqlite::Connection::open(&db).unwrap();
let count: i64 = raw
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('msgs','broken_tbl')",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
count, 0,
"failed multi-schema migration must roll back schema[0]'s tables too"
);
}

View file

@ -36,7 +36,7 @@ static SESSION_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn text_pk_create_with_string_id_and_get_by_string_id() {
let s = Store::open_memory(&SESSION_SCHEMA).unwrap();
let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap();
let uuid = "550e8400-e29b-41d4-a716-446655440000";
let out = create::run(
s.conn(),
@ -54,7 +54,7 @@ fn text_pk_create_with_string_id_and_get_by_string_id() {
#[test]
fn text_pk_update_and_delete_by_string_id() {
let s = Store::open_memory(&SESSION_SCHEMA).unwrap();
let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap();
let uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
create::run(
s.conn(),
@ -78,7 +78,7 @@ fn text_pk_update_and_delete_by_string_id() {
#[test]
fn text_pk_create_rejects_missing_id() {
let s = Store::open_memory(&SESSION_SCHEMA).unwrap();
let s = Store::open_memory(&[&SESSION_SCHEMA]).unwrap();
let err = create::run(s.conn(), &SESSION_SCHEMA, json!({ "title": "x" })).unwrap_err();
assert_eq!(err.exit_code(), 2);
}
@ -107,7 +107,7 @@ static COST_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn real_column_round_trips_f64_unchanged() {
let s = Store::open_memory(&COST_SCHEMA).unwrap();
let s = Store::open_memory(&[&COST_SCHEMA]).unwrap();
let v = create::run(
s.conn(),
&COST_SCHEMA,
@ -121,7 +121,7 @@ fn real_column_round_trips_f64_unchanged() {
#[test]
fn real_default_applies_when_missing() {
let s = Store::open_memory(&COST_SCHEMA).unwrap();
let s = Store::open_memory(&[&COST_SCHEMA]).unwrap();
let v = create::run(
s.conn(),
&COST_SCHEMA,
@ -157,7 +157,7 @@ static CHAT_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn archive_textenum_writes_archived_sentinel_string() {
let s = Store::open_memory(&CHAT_SCHEMA).unwrap();
let s = Store::open_memory(&[&CHAT_SCHEMA]).unwrap();
let uuid = "aaaa-bbbb-cccc";
create::run(
s.conn(),
@ -209,7 +209,7 @@ static META_EDGE_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn text_pair_metadata_link_stores_weight_and_timestamp() {
let s = Store::open_memory(&META_EDGE_SCHEMA).unwrap();
let s = Store::open_memory(&[&META_EDGE_SCHEMA]).unwrap();
link::run(
s.conn(),
&META_EDGE_SCHEMA,
@ -234,7 +234,7 @@ fn text_pair_metadata_rank_respects_weight() {
// Graph: a → b (weight 10), a → c (weight 1). b and c both sink.
// Weighted PageRank should push `b` above `c`; unweighted would
// split flow 50/50 (→ tie or identical rank).
let s = Store::open_memory(&META_EDGE_SCHEMA).unwrap();
let s = Store::open_memory(&[&META_EDGE_SCHEMA]).unwrap();
link::run(
s.conn(),
&META_EDGE_SCHEMA,

View file

@ -41,7 +41,7 @@ static INTEGER_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn text_pair_link_and_lookup() {
let s = Store::open_memory(&TEXT_SCHEMA).unwrap();
let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap();
link::run(
s.conn(),
&TEXT_SCHEMA,
@ -61,7 +61,7 @@ fn text_pair_link_and_lookup() {
#[test]
fn text_pair_link_idempotent() {
let s = Store::open_memory(&TEXT_SCHEMA).unwrap();
let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap();
for _ in 0..3 {
link::run(
s.conn(),
@ -79,7 +79,7 @@ fn text_pair_link_idempotent() {
#[test]
fn text_pair_rank_returns_string_ids() {
let s = Store::open_memory(&TEXT_SCHEMA).unwrap();
let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap();
let pairs = [("a.md", "b.md"), ("a.md", "c.md"), ("b.md", "c.md")];
for (from, to) in pairs {
link::run(s.conn(), &TEXT_SCHEMA, json!({ "from": from, "to": to })).unwrap();
@ -94,7 +94,7 @@ fn text_pair_rank_returns_string_ids() {
#[test]
fn text_pair_rejects_integer_input() {
let s = Store::open_memory(&TEXT_SCHEMA).unwrap();
let s = Store::open_memory(&[&TEXT_SCHEMA]).unwrap();
let err = link::run(s.conn(), &TEXT_SCHEMA, json!({ "from": 1, "to": 2 }))
.unwrap_err();
assert_eq!(err.exit_code(), 2);
@ -126,7 +126,7 @@ static META_EXTRAS_SCHEMA: EntitySchema = EntitySchema {
#[test]
fn text_pair_with_extras_roundtrip() {
let s = Store::open_memory(&META_EXTRAS_SCHEMA).unwrap();
let s = Store::open_memory(&[&META_EXTRAS_SCHEMA]).unwrap();
link::run(
s.conn(),
&META_EXTRAS_SCHEMA,
@ -156,7 +156,7 @@ fn text_pair_with_extras_roundtrip() {
#[test]
fn text_pair_with_custom_col_names_rank_uses_from_to_cols() {
let s = Store::open_memory(&META_EXTRAS_SCHEMA).unwrap();
let s = Store::open_memory(&[&META_EXTRAS_SCHEMA]).unwrap();
for (f, t) in [("a://x", "b://y"), ("a://x", "c://z"), ("b://y", "c://z")] {
link::run(
s.conn(),
@ -175,7 +175,7 @@ fn text_pair_with_custom_col_names_rank_uses_from_to_cols() {
#[test]
fn integer_pair_still_works_after_refactor() {
// Regression guard — kei-task uses IntegerPair implicitly.
let s = Store::open_memory(&INTEGER_SCHEMA).unwrap();
let s = Store::open_memory(&[&INTEGER_SCHEMA]).unwrap();
link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 1, "to": 2 })).unwrap();
link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 1, "to": 3 })).unwrap();
link::run(s.conn(), &INTEGER_SCHEMA, json!({ "from": 2, "to": 3 })).unwrap();

View file

@ -28,7 +28,7 @@ static SCHEMA: EntitySchema = EntitySchema {
custom_migrations: &[],
};
fn mk() -> Store { Store::open_memory(&SCHEMA).unwrap() }
fn mk() -> Store { Store::open_memory(&[&SCHEMA]).unwrap() }
fn create_one(s: &Store, title: &str) -> i64 {
let v = create::run(s.conn(), &SCHEMA, json!({ "title": title })).unwrap();
@ -155,7 +155,7 @@ fn disabled_verb_errors() {
archived_field: None,
custom_migrations: &[],
};
let s = Store::open_memory(&DISABLED).unwrap();
let s = Store::open_memory(&[&DISABLED]).unwrap();
let err = create::run(s.conn(), &DISABLED, json!({ "title": "x" })).unwrap_err();
assert_eq!(err.exit_code(), 2);
}

View file

@ -102,7 +102,7 @@ pub static SAGE_SCHEMA: EntitySchema = EntitySchema {
/// `SAGE_SCHEMA`. Preserved as a named entry point so downstream
/// callers and tests can still spell out the migration explicitly.
pub fn create_schema(conn: &Connection) -> Result<()> {
kei_entity_store::engine::run_migrations(conn, &SAGE_SCHEMA)
kei_entity_store::engine::run_migrations(conn, &[&SAGE_SCHEMA])
.map_err(|e| match e {
kei_entity_store::VerbError::Sqlite(sq) => sq,
other => rusqlite::Error::ToSqlConversionFailure(Box::new(

View file

@ -22,12 +22,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let engine = EngineStore::open(path, &SAGE_SCHEMA).context("engine store open")?;
let engine = EngineStore::open(path, &[&SAGE_SCHEMA]).context("engine store open")?;
Ok(Self { engine })
}
pub fn open_memory() -> Result<Self> {
let engine = EngineStore::open_memory(&SAGE_SCHEMA).context("engine store open_memory")?;
let engine = EngineStore::open_memory(&[&SAGE_SCHEMA]).context("engine store open_memory")?;
Ok(Self { engine })
}

View file

@ -18,12 +18,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, &SOCIAL_SCHEMA)?;
let inner = EntityStore::open(path, &[&SOCIAL_SCHEMA])?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(&SOCIAL_SCHEMA)?;
let inner = EntityStore::open_memory(&[&SOCIAL_SCHEMA])?;
Ok(Self { inner })
}

View file

@ -21,12 +21,12 @@ pub struct Store {
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, &TASK_SCHEMA)?;
let inner = EntityStore::open(path, &[&TASK_SCHEMA])?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(&TASK_SCHEMA)?;
let inner = EntityStore::open_memory(&[&TASK_SCHEMA])?;
Ok(Self { inner })
}