feat(convergence/p1): kei-entity-store engine + schema fragments + kei-task pilot

Layer A + B of convergence wave. Extract common SQLite-CRUD + graph
logic into kei-entity-store engine; introduce JSON Schema fragments;
pilot-migrate kei-task to verify parity.

New crate _primitives/_rust/kei-entity-store/ (1151 LOC):
- src/schema.rs — EntitySchema + FieldDef + enabled_verbs + fts_columns
  + edge_table + custom_migrations
- src/engine.rs — Store::open with WAL pragma + migration runner
- src/verbs/ — 8 data-driven verb modules (create/get/list/search/
  update/delete/link/rank) uniform JSON-in/JSON-out signature
- src/error.rs — typed VerbError enum
- tests/verb_smoke.rs — 10/10 green

New _schemas/fragments/ (83 LOC JSON):
- entity-base.json, titled.json, titled-content.json, edge.json

kei-task pilot migration:
- TASK_SCHEMA: EntitySchema static (67 LOC, was 58)
- store.rs becomes thin shim over engine::Store
- atoms/create.rs + atoms/search.rs delegate to engine verbs
- atoms/schemas/*.json use $ref to _schemas/fragments/ (DRY)
- Task-specific secondary tables (milestones, task_deps) stay via
  schema.custom_migrations; cycle-detection in deps.rs stays
  hand-rolled (domain logic, not generic CRUD)
- 9/9 tests green — full behavioural parity

Convergence delta:
- kei-task touched files: 342 → 389 LOC (+47 for JSON marshalling
  boundary; net wash on pilot)
- BUT each remaining 5 sibling crate can shrink ~400-500 LOC on migration
- Expected total reduction when all 6 migrated: ~2500 LOC across the cluster

Follow-ups declared:
- Migrate kei-chat-store, kei-content-store, kei-social-store to engine
- Migrate kei-sage (needs string-id edge variant; currently generic
  link/rank assume int ids)
- Migrate kei-crossdomain
- Expose list/delete atoms in kei-task (engine supports, atoms not yet)
- Fold kei-curator as engine::hygiene module (per P4 audit)
- Fold kei-search-core entities, keep workflow as thin kei-search-pipeline

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 04:46:47 +08:00
parent 652d6a369b
commit 793b91bc43
30 changed files with 1468 additions and 153 deletions

View file

@ -1993,6 +1993,19 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-entity-store"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"rusqlite",
"serde",
"serde_json",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "kei-forge"
version = "0.1.0"
@ -2174,6 +2187,7 @@ dependencies = [
"anyhow",
"chrono",
"clap",
"kei-entity-store",
"rusqlite",
"serde",
"serde_json",

View file

@ -41,6 +41,8 @@ members = [
"kei-capability",
# v0.24 unification — unified VPS provisioner (supersedes provision-{hetzner,vultr}.sh)
"kei-provision",
# Convergence Layer A — schema-driven verb-template engine for SQLite-CRUD stores
"kei-entity-store",
]
[workspace.package]

View file

@ -0,0 +1,21 @@
[package]
name = "kei-entity-store"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Convergence-Layer-A engine: schema-driven SQLite CRUD + graph verbs shared across kei-*-store crates (kei-task pilot)."
[lib]
name = "kei_entity_store"
path = "src/lib.rs"
[dependencies]
rusqlite = { version = "0.31", features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"
thiserror = "1"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,121 @@
//! Store — thin wrapper over `rusqlite::Connection` that runs the
//! schema's migration DDL on open.
//!
//! The engine does NOT take ownership of verb dispatch. Sibling crates
//! call verb modules directly (e.g. `verbs::create::run(&mut conn,
//! &SCHEMA, input)`). This keeps the engine a passive provider of
//! connection + schema-aware DDL.
use crate::error::VerbError;
use crate::schema::{EntitySchema, FieldDef, FieldKind};
use anyhow::{Context, Result};
use rusqlite::Connection;
use std::path::Path;
pub struct Store {
conn: Connection,
}
impl Store {
/// Open (creates parent dirs, enables WAL, runs migrations for this
/// schema). Same sequence the 5 original sibling crates ran byte-
/// identically.
pub fn open(path: &Path, schema: &EntitySchema) -> Result<Self> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path).context("open sqlite")?;
conn.pragma_update(None, "journal_mode", "WAL").ok();
run_migrations(&conn, schema)?;
Ok(Self { conn })
}
/// In-memory store — unit-test constructor. Same migrations, no FS.
pub fn open_memory(schema: &EntitySchema) -> Result<Self> {
let conn = Connection::open_in_memory()?;
run_migrations(&conn, schema)?;
Ok(Self { conn })
}
pub fn conn(&self) -> &Connection { &self.conn }
pub fn conn_mut(&mut self) -> &mut Connection { &mut self.conn }
/// Escape hatch: consume the Store and return the raw Connection.
/// Callers that still need direct SQL (kei-task milestones,
/// cycle-detection) can use this during migration.
pub fn into_conn(self) -> Connection { self.conn }
}
/// Run: create primary table, indexes, FTS virtual table, edge table,
/// then any custom DDL. Idempotent (all statements use IF NOT EXISTS).
pub fn run_migrations(conn: &Connection, schema: &EntitySchema) -> Result<(), VerbError> {
conn.execute_batch(&ddl_primary_table(schema))?;
conn.execute_batch(&ddl_indexes(schema))?;
if let Some(cols) = schema.fts_columns {
conn.execute_batch(&ddl_fts_table(schema.table, cols))?;
}
if let Some(edge) = schema.edge_table {
conn.execute_batch(&ddl_edge_table(edge))?;
}
for stmt in schema.custom_migrations {
conn.execute_batch(stmt)?;
}
Ok(())
}
fn ddl_primary_table(schema: &EntitySchema) -> String {
let cols: Vec<String> = schema.fields.iter().map(ddl_column).collect();
format!(
"CREATE TABLE IF NOT EXISTS {} (\n {}\n);",
schema.table,
cols.join(",\n ")
)
}
fn ddl_column(f: &FieldDef) -> String {
match f.kind {
FieldKind::IntegerPk => format!("{} INTEGER PRIMARY KEY", f.name),
FieldKind::IntegerNotNull => format!("{} INTEGER NOT NULL", f.name),
FieldKind::Integer => format!("{} INTEGER DEFAULT 0", f.name),
FieldKind::TextNotNull => format!("{} TEXT NOT NULL", f.name),
FieldKind::Text => format!("{} TEXT DEFAULT ''", f.name),
FieldKind::TextDefault => {
let d = f.default.unwrap_or("");
format!("{} TEXT NOT NULL DEFAULT '{}'", f.name, d)
}
FieldKind::TimestampCreated => format!("{} INTEGER NOT NULL", f.name),
FieldKind::TimestampUpdated => format!("{} INTEGER NOT NULL", f.name),
}
}
fn ddl_indexes(schema: &EntitySchema) -> String {
let mut out = String::new();
for f in schema.fields.iter().filter(|f| f.indexed) {
out.push_str(&format!(
"CREATE INDEX IF NOT EXISTS idx_{t}_{c} ON {t}({c});\n",
t = schema.table,
c = f.name
));
}
out
}
fn ddl_fts_table(table: &str, cols: &[&str]) -> String {
let col_list = cols.join(", ");
format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS fts_{table} \
USING fts5({table}_id UNINDEXED, {col_list}, tokenize='porter unicode61');"
)
}
fn ddl_edge_table(edge: &str) -> String {
format!(
"CREATE TABLE IF NOT EXISTS {edge} (\n \
from_id INTEGER NOT NULL,\n \
to_id INTEGER NOT NULL,\n \
edge_type TEXT NOT NULL DEFAULT 'links',\n \
PRIMARY KEY(from_id, to_id, edge_type)\n\
);\n\
CREATE INDEX IF NOT EXISTS idx_{edge}_to ON {edge}(to_id);"
)
}

View file

@ -0,0 +1,36 @@
//! Verb error type. Distinguishes user-input / validation failures
//! (map to CLI exit 2 in callers) from storage / IO failures (exit 1).
use thiserror::Error;
#[derive(Debug, Error)]
pub enum VerbError {
#[error("InvalidInput: {0}")]
InvalidInput(String),
#[error("VerbDisabled: {verb} not enabled on schema {schema}")]
VerbDisabled { verb: String, schema: String },
#[error("NotFound: {entity} id {id}")]
NotFound { entity: String, id: i64 },
#[error("Sqlite: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("Serde: {0}")]
Serde(#[from] serde_json::Error),
#[error("Storage: {0}")]
Storage(String),
}
impl VerbError {
/// Exit code contract — 2 for validation / unknown verb / not found;
/// 1 for storage / IO.
pub fn exit_code(&self) -> u8 {
match self {
Self::InvalidInput(_) | Self::VerbDisabled { .. } | Self::NotFound { .. } => 2,
Self::Sqlite(_) | Self::Serde(_) | Self::Storage(_) => 1,
}
}
}

View file

@ -0,0 +1,24 @@
//! kei-entity-store — Layer A verb-template engine.
//!
//! Provides a schema-driven store that 6 sibling kei-*-store crates can
//! plug into instead of hand-rolling their own `Store::open` + CRUD
//! helpers. An `EntitySchema` declaratively describes one entity table
//! (fields, FTS columns, edge table, enabled verbs); verb modules
//! (`create`, `get`, `list`, `search`, `update`, `delete`, `link`,
//! `rank`) consume the schema and run parameterized SQL.
//!
//! Pilot target: `kei-task` (see its `schema.rs` for an example usage).
//! Follow-up waves: kei-chat-store, kei-content-store, kei-social-store,
//! kei-sage, kei-crossdomain.
//!
//! Per substrate schema v1 this crate stays library-only — no CLI, no
//! `bin`. Each sibling crate remains the user-facing binary.
pub mod engine;
pub mod error;
pub mod schema;
pub mod verbs;
pub use engine::Store;
pub use error::VerbError;
pub use schema::{EntitySchema, FieldDef, FieldKind};

View file

@ -0,0 +1,120 @@
//! EntitySchema — declarative description of one entity table.
//!
//! A sibling crate (e.g. kei-task) defines a `static EntitySchema` and
//! passes a reference into every verb call. The engine reads this
//! structure to know: table name, fields to INSERT/SELECT, FTS columns,
//! edge table (for link/rank), and which verbs are enabled.
/// Field kinds the engine knows how to bind for INSERT / UPDATE and
/// how to read in SELECT. A field's `kind` also drives the CREATE TABLE
/// DDL produced by the engine's migration runner.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FieldKind {
/// INTEGER PRIMARY KEY — exactly one per schema (name = "id").
IntegerPk,
/// INTEGER NOT NULL (with optional DEFAULT 0).
IntegerNotNull,
/// INTEGER, default 0.
Integer,
/// TEXT NOT NULL (no default).
TextNotNull,
/// TEXT with empty-string default.
Text,
/// TEXT NOT NULL with explicit default value (held in `default`).
TextDefault,
/// Unix-timestamp INTEGER auto-stamped on insert (created_at).
TimestampCreated,
/// Unix-timestamp INTEGER auto-stamped on insert + update (updated_at).
TimestampUpdated,
}
/// One column in an EntitySchema.
#[derive(Debug, Clone, Copy)]
pub struct FieldDef {
pub name: &'static str,
pub kind: FieldKind,
/// Default literal for TextDefault / IntegerNotNull (as SQL literal
/// WITHOUT surrounding quotes — engine quotes TEXT automatically).
pub default: Option<&'static str>,
/// Emit a single-column index `idx_<table>_<name>`.
pub indexed: bool,
}
impl FieldDef {
pub const fn pk(name: &'static str) -> Self {
Self { name, kind: FieldKind::IntegerPk, default: None, indexed: false }
}
pub const fn text(name: &'static str) -> Self {
Self { name, kind: FieldKind::Text, default: None, indexed: false }
}
pub const fn text_nn(name: &'static str) -> Self {
Self { name, kind: FieldKind::TextNotNull, default: None, indexed: false }
}
pub const fn text_default(name: &'static str, default: &'static str) -> Self {
Self { name, kind: FieldKind::TextDefault, default: Some(default), indexed: false }
}
pub const fn integer(name: &'static str) -> Self {
Self { name, kind: FieldKind::Integer, default: None, indexed: false }
}
pub const fn integer_nn(name: &'static str) -> Self {
Self { name, kind: FieldKind::IntegerNotNull, default: None, indexed: false }
}
pub const fn created_at() -> Self {
Self { name: "created_at", kind: FieldKind::TimestampCreated,
default: None, indexed: false }
}
pub const fn updated_at() -> Self {
Self { name: "updated_at", kind: FieldKind::TimestampUpdated,
default: None, indexed: false }
}
pub const fn with_index(mut self) -> Self {
self.indexed = true;
self
}
}
/// Declarative schema for one entity.
#[derive(Debug, Clone, Copy)]
pub struct EntitySchema {
/// Human-readable entity name — used in error messages.
pub name: &'static str,
/// SQL table name for the primary entity rows.
pub table: &'static str,
/// Column order — MUST start with the PK.
pub fields: &'static [FieldDef],
/// Verb whitelist — e.g. ["create","get","search","update","delete"].
pub enabled_verbs: &'static [&'static str],
/// If `Some`, engine creates an FTS5 virtual table `fts_<table>`
/// with the listed non-id columns and keeps it in sync on create
/// + update. `search` verb uses it.
pub fts_columns: Option<&'static [&'static str]>,
/// If `Some`, engine creates `<edge_table>(from_id, to_id, edge_type)`
/// for the `link` verb. `rank` verb runs PageRank over it.
pub edge_table: Option<&'static str>,
/// Arbitrary DDL statements run after the primary table + FTS +
/// edge table have been created. Used for secondary tables
/// (milestones, task_deps) that piggy-back on the same DB but are
/// task-specific (not generic-CRUD).
pub custom_migrations: &'static [&'static str],
}
impl EntitySchema {
/// Returns the PK column (always "id" by convention).
pub fn pk(&self) -> &FieldDef {
self.fields
.iter()
.find(|f| f.kind == FieldKind::IntegerPk)
.expect("EntitySchema MUST have exactly one IntegerPk field")
}
/// Returns true if `verb` appears in `enabled_verbs`.
pub fn verb_enabled(&self, verb: &str) -> bool {
self.enabled_verbs.iter().any(|v| *v == verb)
}
/// Returns the list of non-PK field names, in order. Used by the
/// `create` verb to build the INSERT column-list.
pub fn writable_fields(&self) -> impl Iterator<Item = &FieldDef> {
self.fields.iter().filter(|f| f.kind != FieldKind::IntegerPk)
}
}

View file

@ -0,0 +1,137 @@
//! `create` verb — INSERT one row using fields declared on the schema.
//!
//! Input JSON shape: `{ "<field_name>": <value>, ... }`. Only fields
//! declared on the EntitySchema are copied; extras are silently ignored
//! (the atom layer above is responsible for rejecting them if desired).
//! Output: `{ "id": <rowid>, "created_at": <unix ts> }`.
use crate::error::VerbError;
use crate::schema::{EntitySchema, FieldDef, FieldKind};
use chrono::Utc;
use rusqlite::{types::Value as SqlValue, Connection};
use serde_json::{json, Value};
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("create") {
return Err(VerbError::VerbDisabled {
verb: "create".into(),
schema: schema.name.into(),
});
}
let obj = as_object(&input, "create")?;
let now = Utc::now().timestamp();
let id = exec_insert(conn, schema, obj, now)?;
if let Some(cols) = schema.fts_columns {
reindex_fts(conn, schema.table, cols, id, obj)?;
}
let created_at = read_created_at(conn, schema, id).unwrap_or(now);
Ok(json!({ "id": id, "created_at": created_at }))
}
fn exec_insert(
conn: &Connection,
schema: &EntitySchema,
obj: &serde_json::Map<String, Value>,
now: i64,
) -> Result<i64, VerbError> {
let (cols, values) = build_insert(schema, obj, now);
let placeholders: Vec<String> = (1..=cols.len()).map(|i| format!("?{i}")).collect();
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
schema.table,
cols.join(","),
placeholders.join(","),
);
let params: Vec<&dyn rusqlite::ToSql> =
values.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params.as_slice())?;
Ok(conn.last_insert_rowid())
}
fn as_object<'a>(v: &'a Value, verb: &str) -> Result<&'a serde_json::Map<String, Value>, VerbError> {
v.as_object()
.ok_or_else(|| VerbError::InvalidInput(format!("{verb}: expected JSON object")))
}
fn build_insert(
schema: &EntitySchema,
input: &serde_json::Map<String, Value>,
now: i64,
) -> (Vec<&'static str>, Vec<SqlValue>) {
let mut cols: Vec<&'static str> = Vec::new();
let mut values: Vec<SqlValue> = Vec::new();
for f in schema.writable_fields() {
cols.push(f.name);
values.push(field_value_for_insert(f, input, now));
}
(cols, values)
}
fn field_value_for_insert(f: &FieldDef, input: &serde_json::Map<String, Value>, now: i64) -> SqlValue {
match f.kind {
FieldKind::TimestampCreated | FieldKind::TimestampUpdated => {
match input.get(f.name).and_then(|v| v.as_i64()) {
Some(ts) if ts > 0 => SqlValue::Integer(ts),
_ => SqlValue::Integer(now),
}
}
FieldKind::IntegerNotNull | FieldKind::Integer => SqlValue::Integer(
input.get(f.name).and_then(|v| v.as_i64()).unwrap_or(0),
),
FieldKind::TextNotNull | FieldKind::Text => SqlValue::Text(
input.get(f.name).and_then(|v| v.as_str()).unwrap_or("").to_string(),
),
FieldKind::TextDefault => {
let raw = input.get(f.name).and_then(|v| v.as_str()).unwrap_or("");
let final_v = if raw.is_empty() { f.default.unwrap_or("") } else { raw };
SqlValue::Text(final_v.to_string())
}
FieldKind::IntegerPk => SqlValue::Null, // filtered by writable_fields
}
}
fn reindex_fts(
conn: &Connection,
table: &str,
cols: &[&str],
id: i64,
input: &serde_json::Map<String, Value>,
) -> Result<(), VerbError> {
conn.execute(
&format!("DELETE FROM fts_{table} WHERE {table}_id=?1"),
rusqlite::params![id],
)?;
let placeholders: Vec<String> = (2..=(cols.len() + 1)).map(|i| format!("?{i}")).collect();
let sql = format!(
"INSERT INTO fts_{table} ({table}_id, {}) VALUES (?1, {})",
cols.join(", "),
placeholders.join(", "),
);
let mut values: Vec<SqlValue> = vec![SqlValue::Integer(id)];
for c in cols {
let v = input.get(*c).and_then(|v| v.as_str()).unwrap_or("").to_string();
values.push(SqlValue::Text(v));
}
let params: Vec<&dyn rusqlite::ToSql> =
values.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params.as_slice())?;
Ok(())
}
fn read_created_at(conn: &Connection, schema: &EntitySchema, id: i64) -> Option<i64> {
let has_created = schema
.fields
.iter()
.any(|f| f.kind == FieldKind::TimestampCreated);
if !has_created { return None; }
conn.query_row(
&format!("SELECT created_at FROM {} WHERE id=?1", schema.table),
rusqlite::params![id],
|r| r.get::<_, i64>(0),
)
.ok()
}

View file

@ -0,0 +1,52 @@
//! `delete` verb — hard DELETE by id, OR soft (if schema has an
//! `archived` integer field, flips it to 1).
use crate::error::VerbError;
use crate::schema::EntitySchema;
use rusqlite::Connection;
use serde_json::{json, Value};
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("delete") {
return Err(VerbError::VerbDisabled {
verb: "delete".into(),
schema: schema.name.into(),
});
}
let id = input
.get("id")
.and_then(|v| v.as_i64())
.ok_or_else(|| VerbError::InvalidInput("delete: missing `id` integer".into()))?;
let soft = input.get("soft").and_then(|v| v.as_bool()).unwrap_or(false);
let rows = if soft && has_archived_field(schema) {
conn.execute(
&format!("UPDATE {} SET archived = 1 WHERE id=?1", schema.table),
rusqlite::params![id],
)?
} else {
if let Some(cols) = schema.fts_columns {
let _ = cols; // silence unused warning if fts disabled
conn.execute(
&format!("DELETE FROM fts_{} WHERE {}_id=?1", schema.table, schema.table),
rusqlite::params![id],
)?;
}
conn.execute(
&format!("DELETE FROM {} WHERE id=?1", schema.table),
rusqlite::params![id],
)?
};
if rows == 0 {
return Err(VerbError::NotFound { entity: schema.name.into(), id });
}
Ok(json!({ "ok": true, "id": id }))
}
fn has_archived_field(schema: &EntitySchema) -> bool {
schema.fields.iter().any(|f| f.name == "archived")
}

View file

@ -0,0 +1,68 @@
//! `get` verb — SELECT one row by id, returning a JSON object with
//! every declared field.
use crate::error::VerbError;
use crate::schema::{EntitySchema, FieldDef, FieldKind};
use rusqlite::Connection;
use serde_json::{Map, Value};
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("get") {
return Err(VerbError::VerbDisabled {
verb: "get".into(),
schema: schema.name.into(),
});
}
let id = input
.get("id")
.and_then(|v| v.as_i64())
.ok_or_else(|| VerbError::InvalidInput("get: missing `id` integer".into()))?;
let cols: Vec<&str> = schema.fields.iter().map(|f| f.name).collect();
let sql = format!(
"SELECT {} FROM {} WHERE id=?1",
cols.join(","),
schema.table
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query(rusqlite::params![id])?;
match rows.next()? {
Some(r) => Ok(row_to_json(schema, r)?),
None => Err(VerbError::NotFound {
entity: schema.name.into(),
id,
}),
}
}
pub(crate) fn row_to_json(
schema: &EntitySchema,
row: &rusqlite::Row,
) -> Result<Value, VerbError> {
let mut obj = Map::new();
for (idx, f) in schema.fields.iter().enumerate() {
obj.insert(f.name.to_string(), field_to_json(f, row, idx)?);
}
Ok(Value::Object(obj))
}
fn field_to_json(f: &FieldDef, row: &rusqlite::Row, idx: usize) -> Result<Value, VerbError> {
Ok(match f.kind {
FieldKind::IntegerPk
| FieldKind::IntegerNotNull
| FieldKind::Integer
| FieldKind::TimestampCreated
| FieldKind::TimestampUpdated => {
let n: i64 = row.get(idx)?;
Value::from(n)
}
FieldKind::TextNotNull | FieldKind::Text | FieldKind::TextDefault => {
let s: String = row.get(idx)?;
Value::from(s)
}
})
}

View file

@ -0,0 +1,49 @@
//! `link` verb — INSERT edge into `<edge_table>` (idempotent via
//! INSERT OR IGNORE). Caller is responsible for higher-level semantic
//! checks (cycle detection, self-loop) — those live in the sibling
//! crate (e.g. kei-task::deps).
use crate::error::VerbError;
use crate::schema::EntitySchema;
use rusqlite::Connection;
use serde_json::{json, Value};
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("link") {
return Err(VerbError::VerbDisabled {
verb: "link".into(),
schema: schema.name.into(),
});
}
let edge = schema.edge_table.ok_or_else(|| {
VerbError::InvalidInput(format!(
"link: schema {} has no edge_table configured",
schema.name
))
})?;
let from = input
.get("from")
.and_then(|v| v.as_i64())
.ok_or_else(|| VerbError::InvalidInput("link: missing `from` integer".into()))?;
let to = input
.get("to")
.and_then(|v| v.as_i64())
.ok_or_else(|| VerbError::InvalidInput("link: missing `to` integer".into()))?;
let edge_type = input
.get("edge_type")
.and_then(|v| v.as_str())
.unwrap_or("links")
.to_string();
conn.execute(
&format!(
"INSERT OR IGNORE INTO {edge} (from_id, to_id, edge_type) VALUES (?1, ?2, ?3)"
),
rusqlite::params![from, to, edge_type],
)?;
Ok(json!({ "ok": true }))
}

View file

@ -0,0 +1,48 @@
//! `list` verb — paginated SELECT, ordered by id DESC.
//!
//! Input: `{ "limit": <int = 50>, "offset": <int = 0> }`. Both optional.
use crate::error::VerbError;
use crate::schema::EntitySchema;
use crate::verbs::get::row_to_json;
use rusqlite::Connection;
use serde_json::{json, Value};
const DEFAULT_LIMIT: i64 = 50;
const MAX_LIMIT: i64 = 10_000;
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("list") {
return Err(VerbError::VerbDisabled {
verb: "list".into(),
schema: schema.name.into(),
});
}
let limit = clamp(input.get("limit").and_then(|v| v.as_i64()), DEFAULT_LIMIT);
let offset = input.get("offset").and_then(|v| v.as_i64()).unwrap_or(0).max(0);
let cols: Vec<&str> = schema.fields.iter().map(|f| f.name).collect();
let sql = format!(
"SELECT {} FROM {} ORDER BY id DESC LIMIT ?1 OFFSET ?2",
cols.join(","),
schema.table
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query(rusqlite::params![limit, offset])?;
let mut results: Vec<Value> = Vec::new();
while let Some(r) = rows.next()? {
results.push(row_to_json(schema, r)?);
}
Ok(json!({ "results": results }))
}
fn clamp(raw: Option<i64>, default: i64) -> i64 {
match raw {
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => default,
}
}

View file

@ -0,0 +1,25 @@
//! Verb templates — one module per generic CRUD / graph verb.
//!
//! Each verb exposes `pub fn run(conn, schema, input) -> Result<Value,
//! VerbError>` with JSON in / JSON out. Sibling crates wrap these in
//! their typed atom `Input` / `Output` structs via `serde_json::from_value`.
//!
//! The `input` arg is always a `serde_json::Value`. Verbs extract fields
//! they need and ignore everything else, except `create` / `update` which
//! only copy declared schema fields into SQL (defence against
//! unexpected keys).
pub mod create;
pub mod delete;
pub mod get;
pub mod link;
pub mod list;
pub mod rank;
pub mod search;
pub mod update;
/// Full list of supported verbs — SSoT for documentation + schema
/// validation. `EntitySchema.enabled_verbs` entries MUST appear here.
pub const ALL_VERBS: &[&str] = &[
"create", "get", "list", "search", "update", "delete", "link", "rank",
];

View file

@ -0,0 +1,89 @@
//! `rank` verb — PageRank (power iteration, d=0.85, 50 iter) over the
//! schema's `edge_table`. Returns `{ results: [{id, score}, ...] }`
//! sorted by score descending.
//!
//! Ported from `kei-sage/src/pagerank.rs` but generalised to operate on
//! the integer `(from_id, to_id)` edge table this engine provisions.
use crate::error::VerbError;
use crate::schema::EntitySchema;
use rusqlite::Connection;
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
const DAMPING: f64 = 0.85;
const ITERATIONS: usize = 50;
pub fn run(
conn: &Connection,
schema: &EntitySchema,
_input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("rank") {
return Err(VerbError::VerbDisabled {
verb: "rank".into(),
schema: schema.name.into(),
});
}
let edge = schema.edge_table.ok_or_else(|| {
VerbError::InvalidInput(format!(
"rank: schema {} has no edge_table configured",
schema.name
))
})?;
let (nodes, out_edges) = collect_graph(conn, edge)?;
if nodes.is_empty() {
return Ok(json!({ "results": [] }));
}
let mut rank: HashMap<i64, f64> = nodes
.iter()
.map(|n| (*n, 1.0 / nodes.len() as f64))
.collect();
for _ in 0..ITERATIONS {
rank = one_iteration(&nodes, &out_edges, &rank);
}
let mut out: Vec<(i64, f64)> = rank.into_iter().collect();
out.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let results: Vec<Value> =
out.into_iter().map(|(id, score)| json!({ "id": id, "score": score })).collect();
Ok(json!({ "results": results }))
}
fn collect_graph(
conn: &Connection,
edge: &str,
) -> Result<(Vec<i64>, HashMap<i64, Vec<i64>>), VerbError> {
let sql = format!("SELECT from_id, to_id FROM {edge}");
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)))?;
let mut nodes: HashSet<i64> = HashSet::new();
let mut out_edges: HashMap<i64, Vec<i64>> = HashMap::new();
for row in rows {
let (src, dst) = row?;
nodes.insert(src);
nodes.insert(dst);
out_edges.entry(src).or_default().push(dst);
}
Ok((nodes.into_iter().collect(), out_edges))
}
fn one_iteration(
nodes: &[i64],
out_edges: &HashMap<i64, Vec<i64>>,
prev: &HashMap<i64, f64>,
) -> HashMap<i64, f64> {
let n = nodes.len() as f64;
let base = (1.0 - DAMPING) / n;
let mut next: HashMap<i64, f64> = nodes.iter().map(|k| (*k, base)).collect();
for (src, dsts) in out_edges {
if dsts.is_empty() { continue; }
let share = DAMPING * prev.get(src).copied().unwrap_or(0.0) / dsts.len() as f64;
for dst in dsts {
if let Some(slot) = next.get_mut(dst) {
*slot += share;
}
}
}
next
}

View file

@ -0,0 +1,63 @@
//! `search` verb — FTS5 match over `fts_<table>`, JOIN back to entity
//! table, ORDER BY rank.
//!
//! Requires `EntitySchema.fts_columns` to be `Some`.
use crate::error::VerbError;
use crate::schema::EntitySchema;
use crate::verbs::get::row_to_json;
use rusqlite::Connection;
use serde_json::{json, Value};
const DEFAULT_LIMIT: i64 = 20;
const MAX_LIMIT: i64 = 10_000;
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("search") {
return Err(VerbError::VerbDisabled {
verb: "search".into(),
schema: schema.name.into(),
});
}
if schema.fts_columns.is_none() {
return Err(VerbError::InvalidInput(format!(
"search: schema {} has no fts_columns configured",
schema.name
)));
}
let query = input
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| VerbError::InvalidInput("search: missing `query` string".into()))?;
if query.trim().is_empty() {
return Err(VerbError::InvalidInput("search: query must be non-empty".into()));
}
let limit = clamp(input.get("limit").and_then(|v| v.as_i64()));
let cols: Vec<String> = schema.fields.iter().map(|f| format!("t.{}", f.name)).collect();
let sql = format!(
"SELECT {cols_sel} FROM fts_{table} f \
JOIN {table} t ON t.id = f.{table}_id \
WHERE fts_{table} MATCH ?1 ORDER BY rank LIMIT ?2",
cols_sel = cols.join(","),
table = schema.table
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query(rusqlite::params![query, limit])?;
let mut results: Vec<Value> = Vec::new();
while let Some(r) = rows.next()? {
results.push(row_to_json(schema, r)?);
}
Ok(json!({ "results": results }))
}
fn clamp(raw: Option<i64>) -> i64 {
match raw {
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => DEFAULT_LIMIT,
}
}

View file

@ -0,0 +1,155 @@
//! `update` verb — partial update by id. Only keys that appear in
//! the input JSON and that are declared on the schema are written.
use crate::error::VerbError;
use crate::schema::{EntitySchema, FieldDef, FieldKind};
use chrono::Utc;
use rusqlite::{types::Value as SqlValue, Connection};
use serde_json::{json, Value};
pub fn run(
conn: &Connection,
schema: &EntitySchema,
input: Value,
) -> Result<Value, VerbError> {
if !schema.verb_enabled("update") {
return Err(VerbError::VerbDisabled {
verb: "update".into(),
schema: schema.name.into(),
});
}
let obj = input
.as_object()
.ok_or_else(|| VerbError::InvalidInput("update: expected JSON object".into()))?;
let id = obj
.get("id")
.and_then(|v| v.as_i64())
.ok_or_else(|| VerbError::InvalidInput("update: missing `id` integer".into()))?;
let now = Utc::now().timestamp();
let (set_cols, values) = build_set(schema, obj, now);
if set_cols.is_empty() {
return Err(VerbError::InvalidInput("update: no writable fields supplied".into()));
}
exec_update(conn, schema, id, &set_cols, values)?;
if let Some(cols) = schema.fts_columns {
reindex_fts(conn, schema.table, cols, id, obj)?;
}
Ok(json!({ "ok": true, "id": id }))
}
fn exec_update(
conn: &Connection,
schema: &EntitySchema,
id: i64,
set_cols: &[&'static str],
values: Vec<SqlValue>,
) -> Result<(), VerbError> {
let placeholders: Vec<String> =
(1..=set_cols.len()).map(|i| format!("{} = ?{i}", set_cols[i - 1])).collect();
let id_idx = set_cols.len() + 1;
let sql = format!(
"UPDATE {} SET {} WHERE id=?{}",
schema.table,
placeholders.join(", "),
id_idx
);
let mut all: Vec<SqlValue> = values;
all.push(SqlValue::Integer(id));
let params: Vec<&dyn rusqlite::ToSql> =
all.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let rows = conn.execute(&sql, params.as_slice())?;
if rows == 0 {
return Err(VerbError::NotFound { entity: schema.name.into(), id });
}
Ok(())
}
fn build_set(
schema: &EntitySchema,
input: &serde_json::Map<String, Value>,
now: i64,
) -> (Vec<&'static str>, Vec<SqlValue>) {
let mut cols: Vec<&'static str> = Vec::new();
let mut values: Vec<SqlValue> = Vec::new();
for f in schema.writable_fields() {
if f.kind == FieldKind::TimestampUpdated {
cols.push(f.name);
values.push(SqlValue::Integer(now));
continue;
}
if let Some(sql_val) = value_from_input(f, input) {
cols.push(f.name);
values.push(sql_val);
}
}
(cols, values)
}
fn value_from_input(f: &FieldDef, input: &serde_json::Map<String, Value>) -> Option<SqlValue> {
let raw = input.get(f.name)?;
Some(match f.kind {
FieldKind::TextNotNull | FieldKind::Text | FieldKind::TextDefault => {
SqlValue::Text(raw.as_str().unwrap_or("").to_string())
}
FieldKind::IntegerNotNull
| FieldKind::Integer
| FieldKind::TimestampCreated
| FieldKind::TimestampUpdated => SqlValue::Integer(raw.as_i64().unwrap_or(0)),
FieldKind::IntegerPk => return None,
})
}
fn reindex_fts(
conn: &Connection,
table: &str,
cols: &[&str],
id: i64,
input: &serde_json::Map<String, Value>,
) -> Result<(), VerbError> {
// Pull existing values, overlay supplied ones, re-insert.
let existing: serde_json::Map<String, Value> = read_existing_fts(conn, table, cols, id)?;
conn.execute(
&format!("DELETE FROM fts_{table} WHERE {table}_id=?1"),
rusqlite::params![id],
)?;
let placeholders: Vec<String> = (2..=(cols.len() + 1)).map(|i| format!("?{i}")).collect();
let sql = format!(
"INSERT INTO fts_{table} ({table}_id, {}) VALUES (?1, {})",
cols.join(", "),
placeholders.join(", "),
);
let mut values: Vec<SqlValue> = vec![SqlValue::Integer(id)];
for c in cols {
let val = input
.get(*c)
.and_then(|v| v.as_str())
.or_else(|| existing.get(*c).and_then(|v| v.as_str()))
.unwrap_or("")
.to_string();
values.push(SqlValue::Text(val));
}
let params: Vec<&dyn rusqlite::ToSql> =
values.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params.as_slice())?;
Ok(())
}
fn read_existing_fts(
conn: &Connection,
table: &str,
cols: &[&str],
id: i64,
) -> Result<serde_json::Map<String, Value>, VerbError> {
let col_list = cols.join(",");
let sql = format!("SELECT {col_list} FROM {table} WHERE id=?1");
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query(rusqlite::params![id])?;
let mut out = serde_json::Map::new();
if let Some(r) = rows.next()? {
for (i, c) in cols.iter().enumerate() {
let v: String = r.get(i).unwrap_or_default();
out.insert((*c).to_string(), Value::from(v));
}
}
Ok(out)
}

View file

@ -0,0 +1,157 @@
//! Per-verb integration smoke tests on a fixture schema.
use kei_entity_store::schema::{EntitySchema, FieldDef};
use kei_entity_store::verbs::{create, delete, get, link, list, rank, search, update};
use kei_entity_store::Store;
use serde_json::json;
static FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("title"),
FieldDef::text("description"),
FieldDef::text_default("status", "pending"),
FieldDef::text_default("priority", "medium"),
FieldDef::integer("parent_id"),
FieldDef::created_at(),
FieldDef::updated_at(),
];
static SCHEMA: EntitySchema = EntitySchema {
name: "note",
table: "notes",
fields: FIELDS,
enabled_verbs: &["create", "get", "list", "search", "update", "delete", "link", "rank"],
fts_columns: Some(&["title", "description"]),
edge_table: Some("note_edges"),
custom_migrations: &[],
};
fn mk() -> Store { Store::open_memory(&SCHEMA).unwrap() }
fn create_one(s: &Store, title: &str) -> i64 {
let v = create::run(s.conn(), &SCHEMA, json!({ "title": title })).unwrap();
v["id"].as_i64().unwrap()
}
#[test]
fn create_then_get() {
let s = mk();
let id = create_one(&s, "alpha");
let out = get::run(s.conn(), &SCHEMA, json!({ "id": id })).unwrap();
assert_eq!(out["title"], "alpha");
assert_eq!(out["status"], "pending");
assert_eq!(out["priority"], "medium");
assert!(out["created_at"].as_i64().unwrap() > 0);
}
#[test]
fn create_returns_id_and_created_at() {
let s = mk();
let v = create::run(s.conn(), &SCHEMA, json!({ "title": "x" })).unwrap();
assert!(v["id"].as_i64().unwrap() >= 1);
assert!(v["created_at"].as_i64().unwrap() > 0);
}
#[test]
fn list_paginated() {
let s = mk();
for i in 0..5 {
create_one(&s, &format!("n{i}"));
}
let v = list::run(s.conn(), &SCHEMA, json!({ "limit": 3 })).unwrap();
assert_eq!(v["results"].as_array().unwrap().len(), 3);
}
#[test]
fn search_fts() {
let s = mk();
create::run(
s.conn(),
&SCHEMA,
json!({ "title": "refactor router", "description": "split monolith" }),
)
.unwrap();
create_one(&s, "unrelated");
let v = search::run(s.conn(), &SCHEMA, json!({ "query": "refactor" })).unwrap();
assert_eq!(v["results"].as_array().unwrap().len(), 1);
}
#[test]
fn update_partial() {
let s = mk();
let id = create_one(&s, "orig");
update::run(
s.conn(),
&SCHEMA,
json!({ "id": id, "status": "in_progress" }),
)
.unwrap();
let out = get::run(s.conn(), &SCHEMA, json!({ "id": id })).unwrap();
assert_eq!(out["status"], "in_progress");
assert_eq!(out["title"], "orig");
}
#[test]
fn update_not_found_errors() {
let s = mk();
let err = update::run(
s.conn(),
&SCHEMA,
json!({ "id": 9999, "status": "x" }),
)
.unwrap_err();
assert_eq!(err.exit_code(), 2);
}
#[test]
fn delete_removes_row() {
let s = mk();
let id = create_one(&s, "gone");
delete::run(s.conn(), &SCHEMA, json!({ "id": id })).unwrap();
let err = get::run(s.conn(), &SCHEMA, json!({ "id": id })).unwrap_err();
assert_eq!(err.exit_code(), 2);
}
#[test]
fn link_and_rank() {
let s = mk();
let a = create_one(&s, "a");
let b = create_one(&s, "b");
let c = create_one(&s, "c");
link::run(s.conn(), &SCHEMA, json!({ "from": a, "to": b })).unwrap();
link::run(s.conn(), &SCHEMA, json!({ "from": a, "to": c })).unwrap();
link::run(s.conn(), &SCHEMA, json!({ "from": b, "to": c })).unwrap();
let v = rank::run(s.conn(), &SCHEMA, json!({})).unwrap();
let results = v["results"].as_array().unwrap();
assert_eq!(results.len(), 3);
// c receives 2 inbound edges → highest rank.
assert_eq!(results[0]["id"], c);
}
#[test]
fn create_missing_title_becomes_empty_string() {
let s = mk();
// engine does NOT validate NOT NULL content — the sibling atom does
// (e.g. kei-task::atoms::create rejects empty title). Engine only
// enforces SQL-level NOT NULL; defaults to empty string.
let v = create::run(s.conn(), &SCHEMA, json!({})).unwrap();
let id = v["id"].as_i64().unwrap();
let out = get::run(s.conn(), &SCHEMA, json!({ "id": id })).unwrap();
assert_eq!(out["title"], "");
}
#[test]
fn disabled_verb_errors() {
static DISABLED: EntitySchema = EntitySchema {
name: "ro",
table: "ro_items",
fields: FIELDS,
enabled_verbs: &["get", "list"],
fts_columns: None,
edge_table: None,
custom_migrations: &[],
};
let s = Store::open_memory(&DISABLED).unwrap();
let err = create::run(s.conn(), &DISABLED, json!({ "title": "x" })).unwrap_err();
assert_eq!(err.exit_code(), 2);
}

View file

@ -21,6 +21,7 @@ name = "kei_task"
path = "src/lib.rs"
[dependencies]
kei-entity-store = { path = "../kei-entity-store" }
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }

View file

@ -2,6 +2,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/add-dependency-input.json",
"title": "kei-task::add-dependency input",
"description": "Writes one edge into the task DAG. Uses the shared `edge` fragment; restricts `edge_type` (`dep_type` here) to the task-specific enum and renames to `dep_type` for backwards compatibility.",
"type": "object",
"required": ["from", "to"],
"properties": {
@ -25,7 +26,8 @@
"assigned_to",
"depends_on"
],
"default": "blocks"
"default": "blocks",
"description": "Task-specific refinement of the generic `edge.edge_type`"
}
},
"additionalProperties": false,

View file

@ -2,11 +2,12 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/create-input.json",
"title": "kei-task::create input",
"description": "Create a new task row. Extends shared `titled-content` fragment with task-specific `priority` enum and optional `milestone_id`.",
"type": "object",
"required": ["title"],
"allOf": [
{ "$ref": "../../../../../_schemas/fragments/titled-content.json" }
],
"properties": {
"title": { "type": "string", "minLength": 1, "maxLength": 500 },
"description": { "type": "string", "maxLength": 8000, "default": "" },
"priority": {
"type": "string",
"enum": ["critical", "high", "medium", "low"],

View file

@ -2,15 +2,10 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/create-output.json",
"title": "kei-task::create output",
"type": "object",
"required": ["id", "created_at"],
"properties": {
"id": { "type": "integer", "minimum": 1 },
"created_at": {
"type": "integer",
"description": "Unix timestamp (seconds since epoch) when the row was inserted"
}
},
"description": "Uses the shared `entity-base` fragment — every kei-entity-store create verb returns `{id, created_at}`.",
"allOf": [
{ "$ref": "../../../../../_schemas/fragments/entity-base.json" }
],
"additionalProperties": false,
"examples": [
{ "id": 42, "created_at": 1714000000 }

View file

@ -1,8 +1,16 @@
//! kei-task::create atom — see atoms/create.md for contract.
//! kei-task::create atom — contract in atoms/create.md.
//!
//! Layer-A pilot: validates task-specific input (title non-empty,
//! priority enum) then delegates the INSERT + FTS reindex to
//! `kei_entity_store::verbs::create` through the crate-level
//! `TASK_SCHEMA`.
use crate::schema::TASK_SCHEMA;
use crate::store::Store;
use crate::types::{is_valid_priority, Task};
use crate::types::is_valid_priority;
use kei_entity_store::verbs::create as v_create;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -44,14 +52,18 @@ impl std::error::Error for Error {}
pub fn run(store: &Store, input: Input) -> Result<Output, Error> {
validate(&input)?;
let priority = normalize_priority(&input.priority);
let task = Task {
title: input.title,
description: input.description,
priority,
..Default::default()
};
let id = store.create_task(&task).map_err(Error::StoreError)?;
let created = read_created_at(store, id)?;
let payload = json!({
"title": input.title,
"description": input.description,
"priority": priority,
"status": "pending",
});
let v = v_create::run(store.conn(), &TASK_SCHEMA, payload)
.map_err(|e| Error::StoreError(anyhow::anyhow!("{e}")))?;
let id = v["id"].as_i64()
.ok_or_else(|| Error::StoreError(anyhow::anyhow!("missing id")))?;
let created = v["created_at"].as_i64()
.ok_or_else(|| Error::StoreError(anyhow::anyhow!("missing created_at")))?;
Ok(Output { id, created_at: created })
}
@ -68,11 +80,3 @@ fn validate(input: &Input) -> Result<(), Error> {
fn normalize_priority(raw: &str) -> String {
if raw.is_empty() { "medium".into() } else { raw.to_string() }
}
fn read_created_at(store: &Store, id: i64) -> Result<i64, Error> {
let t = store
.get_task(id)
.map_err(Error::StoreError)?
.ok_or_else(|| Error::StoreError(anyhow::anyhow!("task {id} not found after insert")))?;
Ok(t.created_at)
}

View file

@ -1,9 +1,14 @@
//! kei-task::search atom — see atoms/search.md for contract.
//! kei-task::search atom — contract in atoms/search.md.
//!
//! Layer-A pilot: input validation stays here, FTS5 JOIN + row
//! assembly run through `kei_entity_store::verbs::search` using
//! `TASK_SCHEMA`.
use crate::search as search_impl;
use crate::schema::TASK_SCHEMA;
use crate::store::Store;
use crate::types::Task;
use kei_entity_store::verbs::search as v_search;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::fmt;
const DEFAULT_LIMIT: i64 = 20;
@ -58,8 +63,11 @@ pub fn run(store: &Store, input: Input) -> Result<Output, Error> {
return Err(Error::InvalidQuery);
}
let limit = normalize_limit(input.limit);
let hits = search_impl::search(store, &input.query, limit).map_err(Error::StoreError)?;
Ok(Output { results: hits.into_iter().map(task_to_hit).collect() })
let payload = json!({ "query": input.query, "limit": limit });
let v = v_search::run(store.conn(), &TASK_SCHEMA, payload)
.map_err(|e| Error::StoreError(anyhow::anyhow!("{e}")))?;
let results = hits_from_value(&v).map_err(Error::StoreError)?;
Ok(Output { results })
}
fn normalize_limit(raw: Option<i64>) -> i64 {
@ -69,11 +77,26 @@ fn normalize_limit(raw: Option<i64>) -> i64 {
}
}
fn task_to_hit(t: Task) -> SearchHit {
SearchHit {
id: t.id, title: t.title, description: t.description, status: t.status,
priority: t.priority, task_type: t.task_type, parent_id: t.parent_id,
assigned_to: t.assigned_to, due_date: t.due_date, completed_at: t.completed_at,
created_at: t.created_at, updated_at: t.updated_at,
}
fn hits_from_value(v: &Value) -> Result<Vec<SearchHit>, anyhow::Error> {
let arr = v["results"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("search: results not an array"))?;
arr.iter().map(hit_from_row).collect()
}
fn hit_from_row(row: &Value) -> Result<SearchHit, anyhow::Error> {
Ok(SearchHit {
id: row["id"].as_i64().unwrap_or(0),
title: row["title"].as_str().unwrap_or("").to_string(),
description: row["description"].as_str().unwrap_or("").to_string(),
status: row["status"].as_str().unwrap_or("").to_string(),
priority: row["priority"].as_str().unwrap_or("").to_string(),
task_type: row["task_type"].as_str().unwrap_or("").to_string(),
parent_id: row["parent_id"].as_i64().unwrap_or(0),
assigned_to: row["assigned_to"].as_str().unwrap_or("").to_string(),
due_date: row["due_date"].as_i64().unwrap_or(0),
completed_at: row["completed_at"].as_i64().unwrap_or(0),
created_at: row["created_at"].as_i64().unwrap_or(0),
updated_at: row["updated_at"].as_i64().unwrap_or(0),
})
}

View file

@ -1,22 +1,33 @@
//! SQLite schema for tasks + milestones + deps. Port of LBM internal/task/schema.go.
//! kei-task EntitySchema — declarative spec consumed by
//! `kei_entity_store::Store` and its verb templates.
//!
//! Columns match the legacy `CREATE TABLE tasks` DDL byte-for-byte so
//! on-disk databases created before the convergence layer continue to
//! work.
//!
//! Task-specific secondary tables (`milestones`, `task_deps`,
//! `task_milestones`) ride the engine's `custom_migrations` slot — they
//! are not generic CRUD and keep their existing column names so
//! `deps.rs` / `milestones.rs` / `graph.rs` don't need to change.
use rusqlite::{Connection, Result};
use kei_entity_store::schema::{EntitySchema, FieldDef};
const DDL_MAIN: &str = r#"
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
task_type TEXT DEFAULT '',
parent_id INTEGER DEFAULT 0,
assigned_to TEXT DEFAULT '',
due_date INTEGER DEFAULT 0,
completed_at INTEGER DEFAULT 0,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
static FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("title"),
FieldDef::text("description"),
FieldDef::text_default("status", "pending"),
FieldDef::text_default("priority", "medium"),
FieldDef::text("task_type"),
FieldDef::integer("parent_id"),
FieldDef::text("assigned_to"),
FieldDef::integer("due_date"),
FieldDef::integer("completed_at"),
FieldDef::created_at(),
FieldDef::updated_at(),
];
const DDL_SECONDARY: &str = r#"
CREATE INDEX IF NOT EXISTS idx_task_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_task_priority ON tasks(priority);
CREATE INDEX IF NOT EXISTS idx_task_parent ON tasks(parent_id);
@ -45,13 +56,12 @@ const DDL_MAIN: &str = r#"
);
"#;
const DDL_FTS: &str = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS fts_tasks
USING fts5(task_id UNINDEXED, title, description, tokenize='porter unicode61');
"#;
pub fn create_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(DDL_MAIN)?;
conn.execute_batch(DDL_FTS)?;
Ok(())
}
pub static TASK_SCHEMA: EntitySchema = EntitySchema {
name: "task",
table: "tasks",
fields: FIELDS,
enabled_verbs: &["create", "get", "list", "search", "update", "delete"],
fts_columns: Some(&["title", "description"]),
edge_table: None, // task_deps has bespoke column names — managed by deps.rs
custom_migrations: &[DDL_SECONDARY],
};

View file

@ -1,29 +1,37 @@
//! FTS5 search over tasks (title + description).
//!
//! Thin shim over `kei_entity_store::verbs::search` preserved for
//! callers (integration tests, CLI `cmd_search`) that still want the
//! strongly-typed `Vec<Task>` surface.
use crate::schema::TASK_SCHEMA;
use crate::store::Store;
use crate::types::Task;
use anyhow::Result;
use rusqlite::params;
use anyhow::{anyhow, Result};
use kei_entity_store::verbs::search as v_search;
use serde_json::{json, Value};
pub fn search(store: &Store, query: &str, limit: i64) -> Result<Vec<Task>> {
let lim = if limit <= 0 { 20 } else { limit };
let mut stmt = store.conn().prepare(
"SELECT t.id, t.title, t.description, t.status, t.priority, t.task_type,
t.parent_id, t.assigned_to, t.due_date, t.completed_at,
t.created_at, t.updated_at
FROM fts_tasks f
JOIN tasks t ON t.id = f.task_id
WHERE fts_tasks MATCH ?1 ORDER BY rank LIMIT ?2",
)?;
let rows = stmt.query_map(params![query, lim], |r| {
Ok(Task {
id: r.get(0)?, title: r.get(1)?, description: r.get(2)?, status: r.get(3)?,
priority: r.get(4)?, task_type: r.get(5)?, parent_id: r.get(6)?,
assigned_to: r.get(7)?, due_date: r.get(8)?, completed_at: r.get(9)?,
created_at: r.get(10)?, updated_at: r.get(11)?,
})
})?;
let mut out = Vec::new();
for r in rows { out.push(r?); }
Ok(out)
let v = v_search::run(store.conn(), &TASK_SCHEMA, json!({ "query": query, "limit": lim }))
.map_err(|e| anyhow!("{e}"))?;
let arr = v["results"].as_array().ok_or_else(|| anyhow!("results missing"))?;
arr.iter().map(row_to_task).collect()
}
fn row_to_task(r: &Value) -> Result<Task> {
Ok(Task {
id: r["id"].as_i64().unwrap_or(0),
title: r["title"].as_str().unwrap_or("").into(),
description: r["description"].as_str().unwrap_or("").into(),
status: r["status"].as_str().unwrap_or("").into(),
priority: r["priority"].as_str().unwrap_or("").into(),
task_type: r["task_type"].as_str().unwrap_or("").into(),
parent_id: r["parent_id"].as_i64().unwrap_or(0),
assigned_to: r["assigned_to"].as_str().unwrap_or("").into(),
due_date: r["due_date"].as_i64().unwrap_or(0),
completed_at: r["completed_at"].as_i64().unwrap_or(0),
created_at: r["created_at"].as_i64().unwrap_or(0),
updated_at: r["updated_at"].as_i64().unwrap_or(0),
})
}

View file

@ -1,94 +1,101 @@
//! Task store — open, CRUD, FTS reindex.
//! Task store — thin shim over `kei_entity_store::Store`.
//!
//! Layer-A convergence pilot (2026-04-23): generic CRUD (create / get /
//! update) now runs through `kei_entity_store::verbs::*` using the
//! declarative `TASK_SCHEMA`. Public surface is preserved byte-for-byte
//! so existing integration tests and callers (`atoms::create`,
//! `milestones`, `deps`, `search`) compile unchanged.
use crate::schema::create_schema;
use crate::schema::TASK_SCHEMA;
use crate::types::Task;
use anyhow::{Context, Result};
use chrono::Utc;
use rusqlite::{params, Connection};
use anyhow::{anyhow, Result};
use kei_entity_store::verbs::{create as v_create, get as v_get, update as v_update};
use kei_entity_store::Store as EntityStore;
use rusqlite::Connection;
use serde_json::{json, Value};
use std::path::Path;
pub struct Store {
conn: Connection,
inner: EntityStore,
}
impl Store {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path).context("open sqlite")?;
conn.pragma_update(None, "journal_mode", "WAL").ok();
create_schema(&conn)?;
Ok(Self { conn })
let inner = EntityStore::open(path, &TASK_SCHEMA)?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
create_schema(&conn)?;
Ok(Self { conn })
let inner = EntityStore::open_memory(&TASK_SCHEMA)?;
Ok(Self { inner })
}
pub fn conn(&self) -> &Connection { &self.conn }
pub fn conn(&self) -> &Connection { self.inner.conn() }
pub fn create_task(&self, t: &Task) -> Result<i64> {
let now = Utc::now().timestamp();
let created = if t.created_at == 0 { now } else { t.created_at };
let status = if t.status.is_empty() { "pending" } else { &t.status };
let priority = if t.priority.is_empty() { "medium" } else { &t.priority };
self.conn.execute(
"INSERT INTO tasks (title, description, status, priority, task_type,
parent_id, assigned_to, due_date, completed_at, created_at, updated_at)
VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11)",
params![t.title, t.description, status, priority, t.task_type,
t.parent_id, t.assigned_to, t.due_date, t.completed_at, created, now],
)?;
let id = self.conn.last_insert_rowid();
self.reindex_fts(id, &t.title, &t.description)?;
Ok(id)
let input = json!({
"title": t.title,
"description": t.description,
"status": status,
"priority": priority,
"task_type": t.task_type,
"parent_id": t.parent_id,
"assigned_to": t.assigned_to,
"due_date": t.due_date,
"completed_at": t.completed_at,
"created_at": t.created_at,
});
let v = v_create::run(self.inner.conn(), &TASK_SCHEMA, input)
.map_err(|e| anyhow!("{e}"))?;
v["id"].as_i64().ok_or_else(|| anyhow!("missing id in create response"))
}
pub fn get_task(&self, id: i64) -> Result<Option<Task>> {
let mut stmt = self.conn.prepare(
"SELECT id, title, description, status, priority, task_type, parent_id,
assigned_to, due_date, completed_at, created_at, updated_at
FROM tasks WHERE id=?1",
)?;
let mut rows = stmt.query(params![id])?;
if let Some(r) = rows.next()? {
return Ok(Some(row_to_task(r)?));
match v_get::run(self.inner.conn(), &TASK_SCHEMA, json!({ "id": id })) {
Ok(v) => Ok(Some(task_from_json(v)?)),
Err(e) if e.exit_code() == 2 => Ok(None),
Err(e) => Err(anyhow!("{e}")),
}
Ok(None)
}
pub fn update_task(&self, t: &Task) -> Result<()> {
let now = Utc::now().timestamp();
let completed = if t.status == "completed" && t.completed_at == 0 { now } else { t.completed_at };
self.conn.execute(
"UPDATE tasks SET title=?1, description=?2, status=?3, priority=?4,
task_type=?5, parent_id=?6, assigned_to=?7, due_date=?8,
completed_at=?9, updated_at=?10 WHERE id=?11",
params![t.title, t.description, t.status, t.priority, t.task_type,
t.parent_id, t.assigned_to, t.due_date, completed, now, t.id],
)?;
self.reindex_fts(t.id, &t.title, &t.description)?;
Ok(())
}
fn reindex_fts(&self, id: i64, title: &str, description: &str) -> Result<()> {
self.conn.execute("DELETE FROM fts_tasks WHERE task_id=?1", params![id])?;
self.conn.execute(
"INSERT INTO fts_tasks (task_id, title, description) VALUES (?1,?2,?3)",
params![id, title, description],
)?;
let now = chrono::Utc::now().timestamp();
let completed = if t.status == "completed" && t.completed_at == 0 { now }
else { t.completed_at };
let input = json!({
"id": t.id,
"title": t.title,
"description": t.description,
"status": t.status,
"priority": t.priority,
"task_type": t.task_type,
"parent_id": t.parent_id,
"assigned_to": t.assigned_to,
"due_date": t.due_date,
"completed_at": completed,
});
v_update::run(self.inner.conn(), &TASK_SCHEMA, input)
.map_err(|e| anyhow!("{e}"))?;
Ok(())
}
}
fn row_to_task(r: &rusqlite::Row) -> rusqlite::Result<Task> {
fn task_from_json(v: Value) -> Result<Task> {
let obj = v.as_object().ok_or_else(|| anyhow!("expected object in get response"))?;
Ok(Task {
id: r.get(0)?, title: r.get(1)?, description: r.get(2)?, status: r.get(3)?,
priority: r.get(4)?, task_type: r.get(5)?, parent_id: r.get(6)?,
assigned_to: r.get(7)?, due_date: r.get(8)?, completed_at: r.get(9)?,
created_at: r.get(10)?, updated_at: r.get(11)?,
id: obj.get("id").and_then(|x| x.as_i64()).unwrap_or(0),
title: obj.get("title").and_then(|x| x.as_str()).unwrap_or("").to_string(),
description: obj.get("description").and_then(|x| x.as_str()).unwrap_or("").to_string(),
status: obj.get("status").and_then(|x| x.as_str()).unwrap_or("").to_string(),
priority: obj.get("priority").and_then(|x| x.as_str()).unwrap_or("").to_string(),
task_type: obj.get("task_type").and_then(|x| x.as_str()).unwrap_or("").to_string(),
parent_id: obj.get("parent_id").and_then(|x| x.as_i64()).unwrap_or(0),
assigned_to: obj.get("assigned_to").and_then(|x| x.as_str()).unwrap_or("").to_string(),
due_date: obj.get("due_date").and_then(|x| x.as_i64()).unwrap_or(0),
completed_at: obj.get("completed_at").and_then(|x| x.as_i64()).unwrap_or(0),
created_at: obj.get("created_at").and_then(|x| x.as_i64()).unwrap_or(0),
updated_at: obj.get("updated_at").and_then(|x| x.as_i64()).unwrap_or(0),
})
}

View file

@ -0,0 +1,30 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "_schemas/fragments/edge.json",
"title": "Edge input",
"description": "Input mixin — one directed edge in a graph entity. Consumed by `link` / `rank` verbs in kei-entity-store.",
"type": "object",
"required": ["from", "to"],
"properties": {
"from": {
"type": "integer",
"minimum": 1,
"description": "Source entity id"
},
"to": {
"type": "integer",
"minimum": 1,
"description": "Target entity id"
},
"edge_type": {
"type": "string",
"default": "links",
"description": "Semantic label for the edge (e.g. 'blocks', 'depends_on', 'links')"
},
"weight": {
"type": "number",
"minimum": 0,
"description": "Optional edge weight (defaults to 1.0 if absent)"
}
}
}

View file

@ -0,0 +1,19 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "_schemas/fragments/entity-base.json",
"title": "Entity base fields",
"description": "Common output fields emitted by every kei-entity-store verb. Referenced by `create-output`, `get-output` etc across kei-*-store crates.",
"type": "object",
"required": ["id", "created_at"],
"properties": {
"id": {
"type": "integer",
"minimum": 1,
"description": "SQLite rowid assigned at INSERT"
},
"created_at": {
"type": "integer",
"description": "Unix timestamp (seconds since epoch) at row creation"
}
}
}

View file

@ -0,0 +1,18 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "_schemas/fragments/titled-content.json",
"title": "Titled + content input",
"description": "Input mixin — titled entity with an optional body/description/content string. Composition of `titled.json` + an additional string field.",
"type": "object",
"allOf": [
{ "$ref": "titled.json" }
],
"properties": {
"description": {
"type": "string",
"maxLength": 8000,
"default": "",
"description": "Free-form body text. Indexed by FTS5 when the entity has fts_columns configured."
}
}
}

View file

@ -0,0 +1,16 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "_schemas/fragments/titled.json",
"title": "Titled entity input",
"description": "Input mixin — non-empty bounded title. Extends `entity-base` with a required `title` string.",
"type": "object",
"required": ["title"],
"properties": {
"title": {
"type": "string",
"minLength": 1,
"maxLength": 500,
"description": "Human-readable label"
}
}
}