From 76dcdc5c8775070f47b817dc8e0a398b10050b81 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Thu, 23 Apr 2026 05:55:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(r2):=20new=20kei-cache=20crate=20=E2=80=94?= =?UTF-8?q?=20deterministic=20result=20cache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps pure (query/transform) atom invocations with SHA-256 keyed cache. Refuses Command/Stream kind atoms as unsafe. 22/22 tests (14 unit + 8 integration). Canonical JSON keying (formatting-drift safe). TTL expiry. AtomExecutor trait decouples subprocess from test mocks. Default DB ~/.claude/cache/cache.sqlite, overridable via --db or $KEI_CACHE_DB. Workspace Cargo.toml: +kei-cache member. Co-Authored-By: Claude Opus 4.7 (1M context) --- _primitives/_rust/Cargo.lock | 15 ++ _primitives/_rust/Cargo.toml | 2 + _primitives/_rust/kei-cache/Cargo.toml | 27 ++ _primitives/_rust/kei-cache/src/exec.rs | 139 ++++++++++ _primitives/_rust/kei-cache/src/key.rs | 98 ++++++++ _primitives/_rust/kei-cache/src/lib.rs | 120 +++++++++ _primitives/_rust/kei-cache/src/main.rs | 137 ++++++++++ _primitives/_rust/kei-cache/src/store.rs | 237 ++++++++++++++++++ .../_rust/kei-cache/tests/cache_smoke.rs | 138 ++++++++++ 9 files changed, 913 insertions(+) create mode 100644 _primitives/_rust/kei-cache/Cargo.toml create mode 100644 _primitives/_rust/kei-cache/src/exec.rs create mode 100644 _primitives/_rust/kei-cache/src/key.rs create mode 100644 _primitives/_rust/kei-cache/src/lib.rs create mode 100644 _primitives/_rust/kei-cache/src/main.rs create mode 100644 _primitives/_rust/kei-cache/src/store.rs create mode 100644 _primitives/_rust/kei-cache/tests/cache_smoke.rs diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 908b8c5..694c4d3 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -1905,6 +1905,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "kei-cache" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "kei-atom-discovery", + "rusqlite", + "serde", + "serde_json", + "sha2 0.10.9", + "tempfile", + "thiserror 1.0.69", +] + [[package]] name = "kei-capability" version = "0.1.0" diff --git a/_primitives/_rust/Cargo.toml b/_primitives/_rust/Cargo.toml index 16569eb..b3ee517 100644 --- a/_primitives/_rust/Cargo.toml +++ b/_primitives/_rust/Cargo.toml @@ -43,6 +43,8 @@ members = [ "kei-provision", # Convergence Layer A — schema-driven verb-template engine for SQLite-CRUD stores "kei-entity-store", + # v1 substrate — deterministic result cache for pure (query/transform) atoms + "kei-cache", ] [workspace.package] diff --git a/_primitives/_rust/kei-cache/Cargo.toml b/_primitives/_rust/kei-cache/Cargo.toml new file mode 100644 index 0000000..c95fee6 --- /dev/null +++ b/_primitives/_rust/kei-cache/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "kei-cache" +version = "0.1.0" +edition = "2021" +rust-version = "1.75" +description = "Atom result cache — deterministic wrapping of pure (query/transform) atom invocations" + +[[bin]] +name = "kei-cache" +path = "src/main.rs" + +[lib] +name = "kei_cache" +path = "src/lib.rs" + +[dependencies] +rusqlite = { version = "0.31", features = ["bundled"] } +clap = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } +anyhow = "1" +thiserror = "1" +kei-atom-discovery = { path = "../kei-atom-discovery" } + +[dev-dependencies] +tempfile = "3" diff --git a/_primitives/_rust/kei-cache/src/exec.rs b/_primitives/_rust/kei-cache/src/exec.rs new file mode 100644 index 0000000..0e9f631 --- /dev/null +++ b/_primitives/_rust/kei-cache/src/exec.rs @@ -0,0 +1,139 @@ +//! Atom invocation on cache miss. +//! +//! Constructor Pattern: `AtomExecutor` trait = one-method contract +//! (atom_id + canonical input → JSON payload string). `SubprocessExecutor` +//! is the production impl — mirrors the kei-runtime binary-resolution +//! rules (`KEI_RUNTIME_BIN_DIR` → `$PATH`) and spawns +//! ` run-atom ` with the input on stdin. +//! +//! Kind-safety: before invoking we consult `kei-atom-discovery` to obtain +//! `AtomKind`. `command` and `stream` are refused ("unsafe to cache"); +//! `query` and `transform` pass through. + +use anyhow::{anyhow, Context, Result}; +use kei_atom_discovery::{discover_atoms, AtomKind, AtomMeta}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +/// Strategy for invoking an atom after a cache miss. +/// +/// Implementations MUST return the atom's raw JSON stdout as a String. +/// They MUST NOT perform any caching themselves. +pub trait AtomExecutor { + fn execute(&self, atom_id: &str, input_json: &str) -> Result; +} + +/// Production executor: resolves atom metadata via kei-atom-discovery, +/// refuses non-cacheable kinds, and spawns ` run-atom `. +pub struct SubprocessExecutor { + atoms_root: PathBuf, +} + +impl SubprocessExecutor { + pub fn new(atoms_root: impl Into) -> Self { + Self { atoms_root: atoms_root.into() } + } + + fn find_meta(&self, atom_id: &str) -> Result { + discover_atoms(&self.atoms_root) + .into_iter() + .find(|a| a.full_id == atom_id) + .ok_or_else(|| anyhow!("no atom matching `{atom_id}` under {}", self.atoms_root.display())) + } +} + +impl AtomExecutor for SubprocessExecutor { + fn execute(&self, atom_id: &str, input_json: &str) -> Result { + let meta = self.find_meta(atom_id)?; + ensure_cacheable(&meta.kind, atom_id)?; + run_subprocess(&meta, input_json) + } +} + +/// Gate: only pure kinds may be cached. Command has side effects; stream is +/// incremental so caching the first frame would be misleading. +pub fn ensure_cacheable(kind: &AtomKind, atom_id: &str) -> Result<()> { + match kind { + AtomKind::Query | AtomKind::Transform => Ok(()), + AtomKind::Command => Err(anyhow!( + "atom `{atom_id}` has kind=command (side effects); unsafe to cache" + )), + AtomKind::Stream => Err(anyhow!( + "atom `{atom_id}` has kind=stream (incremental); unsafe to cache" + )), + } +} + +/// Spawn ` run-atom ` with `input_json` on stdin; return stdout. +fn run_subprocess(meta: &AtomMeta, input_json: &str) -> Result { + let bin = resolve_binary(&meta.crate_name) + .ok_or_else(|| anyhow!("binary `{}` not on PATH or KEI_RUNTIME_BIN_DIR", meta.crate_name))?; + let mut child = Command::new(&bin) + .arg("run-atom") + .arg(&meta.verb) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .with_context(|| format!("spawn {}", bin.display()))?; + if let Some(mut stdin) = child.stdin.take() { + stdin + .write_all(input_json.as_bytes()) + .context("write stdin to atom subprocess")?; + } + let out = child.wait_with_output().context("wait on atom subprocess")?; + if !out.status.success() { + let code = out.status.code().unwrap_or(-1); + let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); + return Err(anyhow!("atom `{}` exited {code}: {stderr}", meta.full_id)); + } + Ok(String::from_utf8_lossy(&out.stdout).trim().to_string()) +} + +/// Resolve binary by name: +/// 1. `$KEI_RUNTIME_BIN_DIR/` when env var is set and file exists +/// 2. Walk `$PATH`, return first `/` that exists +pub fn resolve_binary(crate_name: &str) -> Option { + if let Ok(dir) = std::env::var("KEI_RUNTIME_BIN_DIR") { + let candidate = PathBuf::from(dir).join(crate_name); + if candidate.is_file() { + return Some(candidate); + } + } + let path = std::env::var("PATH").ok()?; + for dir in std::env::split_paths(&path) { + let candidate: PathBuf = Path::new(&dir).join(crate_name); + if candidate.is_file() { + return Some(candidate); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rejects_command_kind() { + let err = ensure_cacheable(&AtomKind::Command, "atom:x").unwrap_err(); + assert!(err.to_string().contains("unsafe to cache")); + } + + #[test] + fn rejects_stream_kind() { + let err = ensure_cacheable(&AtomKind::Stream, "atom:x").unwrap_err(); + assert!(err.to_string().contains("unsafe to cache")); + } + + #[test] + fn accepts_query_kind() { + ensure_cacheable(&AtomKind::Query, "atom:x").unwrap(); + } + + #[test] + fn accepts_transform_kind() { + ensure_cacheable(&AtomKind::Transform, "atom:x").unwrap(); + } +} diff --git a/_primitives/_rust/kei-cache/src/key.rs b/_primitives/_rust/kei-cache/src/key.rs new file mode 100644 index 0000000..4399d2d --- /dev/null +++ b/_primitives/_rust/kei-cache/src/key.rs @@ -0,0 +1,98 @@ +//! Cache key derivation. +//! +//! Constructor Pattern: one cube = canonical JSON serialisation + SHA-256. +//! Key = SHA-256(atom_id || '\0' || canonical_json(input)). +//! +//! Canonical JSON: object keys sorted lexicographically at every depth, no +//! insignificant whitespace. Ensures semantically-identical inputs hash to +//! the same bytes regardless of source formatting. + +use serde_json::{Map, Value}; +use sha2::{Digest, Sha256}; + +/// Produce canonical JSON bytes: stable key order, minimal whitespace. +pub fn canonical_json(v: &Value) -> String { + let canon = canonicalise(v.clone()); + serde_json::to_string(&canon).expect("canonical_json: serialise never fails for owned Value") +} + +/// Recursively canonicalise: sort object keys at every nesting level. +fn canonicalise(v: Value) -> Value { + match v { + Value::Object(m) => { + let mut keys: Vec = m.keys().cloned().collect(); + keys.sort(); + let mut out = Map::with_capacity(keys.len()); + let mut src = m; + for k in keys { + if let Some(val) = src.remove(&k) { + out.insert(k, canonicalise(val)); + } + } + Value::Object(out) + } + Value::Array(a) => Value::Array(a.into_iter().map(canonicalise).collect()), + other => other, + } +} + +/// Compute cache key as 64-hex SHA-256 digest of (atom_id \0 canonical_json). +pub fn cache_key(atom_id: &str, input: &Value) -> String { + let mut hasher = Sha256::new(); + hasher.update(atom_id.as_bytes()); + hasher.update([0u8]); + hasher.update(canonical_json(input).as_bytes()); + let digest = hasher.finalize(); + hex_lower(&digest) +} + +/// Hex-encode lowercase without pulling a separate crate. +fn hex_lower(bytes: &[u8]) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + s.push(HEX[(b >> 4) as usize] as char); + s.push(HEX[(b & 0x0f) as usize] as char); + } + s +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn canonical_sorts_keys() { + let a = json!({"z": 1, "a": 2, "m": {"y": 1, "b": 2}}); + let b = json!({"a": 2, "m": {"b": 2, "y": 1}, "z": 1}); + assert_eq!(canonical_json(&a), canonical_json(&b)); + } + + #[test] + fn key_stable_across_formatting() { + let a = json!({"x": 1, "y": [1, 2]}); + let b: Value = serde_json::from_str(" {\"y\":[1,2],\"x\":1} ").unwrap(); + assert_eq!(cache_key("atom:foo", &a), cache_key("atom:foo", &b)); + } + + #[test] + fn key_differs_by_input() { + let a = json!({"x": 1}); + let b = json!({"x": 2}); + assert_ne!(cache_key("atom:foo", &a), cache_key("atom:foo", &b)); + } + + #[test] + fn key_differs_by_atom_id() { + let v = json!({"x": 1}); + assert_ne!(cache_key("atom:foo", &v), cache_key("atom:bar", &v)); + } + + #[test] + fn key_is_64_hex() { + let k = cache_key("atom:x", &json!({})); + assert_eq!(k.len(), 64); + assert!(k.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())); + } +} diff --git a/_primitives/_rust/kei-cache/src/lib.rs b/_primitives/_rust/kei-cache/src/lib.rs new file mode 100644 index 0000000..362b9cc --- /dev/null +++ b/_primitives/_rust/kei-cache/src/lib.rs @@ -0,0 +1,120 @@ +//! kei-cache — deterministic caching primitive for pure atom invocations. +//! +//! Entry point is [`wrap_with`]: given a cache [`rusqlite::Connection`], an +//! [`exec::AtomExecutor`], an atom id, JSON input, and a TTL, either +//! return the cached payload or invoke the executor, store the result, +//! and return it. +//! +//! Key derivation lives in [`key`]. Storage lives in [`store`]. Invocation +//! on miss lives in [`exec`]. `lib.rs` only composes them — it owns no +//! persistent state. + +pub mod exec; +pub mod key; +pub mod store; + +use anyhow::{Context, Result}; +use rusqlite::Connection; +use serde_json::Value; + +pub use exec::{AtomExecutor, SubprocessExecutor}; +pub use store::Stats; + +/// Outcome of a [`wrap_with`] call. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Outcome { + Hit, + Miss, +} + +impl Outcome { + pub fn as_str(&self) -> &'static str { + match self { + Outcome::Hit => "hit", + Outcome::Miss => "miss", + } + } +} + +/// Cache trait — library-level API for downstream consumers. +/// +/// Production impl is [`SqliteCache`]. Tests may provide in-memory impls. +pub trait Cache { + /// Fetch from the cache; `None` if absent or expired. + fn get(&self, key: &str) -> Result>; + /// Store `payload` under `key` with TTL (seconds). + fn put(&self, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()>; +} + +/// SQLite-backed cache impl. Holds a borrowed [`Connection`]. +pub struct SqliteCache<'c> { + pub conn: &'c Connection, +} + +impl<'c> Cache for SqliteCache<'c> { + fn get(&self, key: &str) -> Result> { + store::get(self.conn, key) + } + fn put(&self, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()> { + store::put(self.conn, key, atom_id, payload, ttl_sec) + } +} + +/// Top-level wrap: lookup → return on hit, invoke + store on miss. +/// +/// Returns `(payload_string, outcome)`. `payload_string` is the atom's +/// JSON stdout verbatim (trimmed). `outcome` distinguishes hit vs miss +/// so the CLI can emit `cache=hit|miss` to stderr. +pub fn wrap_with( + conn: &Connection, + executor: &E, + atom_id: &str, + input_json: &str, + ttl_sec: i64, +) -> Result<(String, Outcome)> { + let input: Value = + serde_json::from_str(input_json).with_context(|| "input is not valid JSON")?; + let key = key::cache_key(atom_id, &input); + if let Some(payload) = store::get(conn, &key)? { + let _ = store::bump(conn, "hits"); + return Ok((payload, Outcome::Hit)); + } + let payload = executor + .execute(atom_id, input_json) + .with_context(|| format!("execute atom `{atom_id}`"))?; + store::put(conn, &key, atom_id, &payload, ttl_sec)?; + let _ = store::bump(conn, "misses"); + Ok((payload, Outcome::Miss)) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::Cell; + use tempfile::tempdir; + + struct CountingExec { + calls: Cell, + reply: String, + } + + impl AtomExecutor for CountingExec { + fn execute(&self, _atom_id: &str, _input_json: &str) -> Result { + self.calls.set(self.calls.get() + 1); + Ok(self.reply.clone()) + } + } + + #[test] + fn hit_skips_executor() { + let d = tempdir().unwrap(); + let p = d.path().join("c.sqlite"); + let conn = store::open(&p).unwrap(); + let ex = CountingExec { calls: Cell::new(0), reply: "{\"r\":1}".into() }; + let (_, o1) = wrap_with(&conn, &ex, "atom:x", "{\"a\":1}", 60).unwrap(); + let (_, o2) = wrap_with(&conn, &ex, "atom:x", "{\"a\":1}", 60).unwrap(); + assert_eq!(o1, Outcome::Miss); + assert_eq!(o2, Outcome::Hit); + assert_eq!(ex.calls.get(), 1); + } +} diff --git a/_primitives/_rust/kei-cache/src/main.rs b/_primitives/_rust/kei-cache/src/main.rs new file mode 100644 index 0000000..7f3049f --- /dev/null +++ b/_primitives/_rust/kei-cache/src/main.rs @@ -0,0 +1,137 @@ +//! kei-cache CLI dispatcher. +//! +//! Constructor Pattern: single cube = arg parsing + dispatch + formatting. +//! Storage: `~/.claude/cache/cache.sqlite` (or `$KEI_CACHE_DB` override). + +use clap::{Parser, Subcommand}; +use kei_cache::{store, wrap_with, Outcome, SubprocessExecutor}; +use std::path::PathBuf; +use std::process::ExitCode; + +#[derive(Parser)] +#[command(name = "kei-cache", version, about = "Atom result cache")] +struct Cli { + /// Override cache DB path (default: $KEI_CACHE_DB or ~/.claude/cache/cache.sqlite) + #[arg(long)] + db: Option, + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Subcommand)] +enum Cmd { + /// Wrap an atom invocation with deterministic caching. + Wrap { + /// Atom id (e.g. `kei-router:route`). + atom_id: String, + /// JSON-string input to hash + forward on miss. + #[arg(long)] + input: String, + /// TTL in seconds (default: 3600). + #[arg(long, default_value_t = 3600)] + ttl: i64, + /// Atoms-root for discovery (default: $KEI_ATOMS_ROOT or cwd). + #[arg(long)] + atoms_root: Option, + }, + /// Print hit/miss + live entry counts. + Stats, + /// Evict all expired entries. + Purge, + /// Wipe cache + counters. + Clear, +} + +fn db_path(cli_db: Option) -> PathBuf { + if let Some(p) = cli_db { + return p; + } + if let Ok(env) = std::env::var("KEI_CACHE_DB") { + return PathBuf::from(env); + } + let home = std::env::var("HOME").unwrap_or_else(|_| ".".into()); + PathBuf::from(home).join(".claude/cache/cache.sqlite") +} + +fn atoms_root(flag: Option) -> PathBuf { + flag.or_else(|| std::env::var("KEI_ATOMS_ROOT").ok().map(PathBuf::from)) + .unwrap_or_else(|| PathBuf::from(".")) +} + +fn err(msg: &str) -> ExitCode { + eprintln!("kei-cache: {msg}"); + ExitCode::from(1) +} + +fn cmd_wrap( + conn: &rusqlite::Connection, + atom_id: &str, + input: &str, + ttl: i64, + atoms_root: PathBuf, +) -> ExitCode { + let executor = SubprocessExecutor::new(atoms_root); + match wrap_with(conn, &executor, atom_id, input, ttl) { + Ok((payload, Outcome::Hit)) => { + eprintln!("cache=hit"); + println!("{payload}"); + ExitCode::SUCCESS + } + Ok((payload, Outcome::Miss)) => { + eprintln!("cache=miss"); + println!("{payload}"); + ExitCode::SUCCESS + } + Err(e) => err(&format!("wrap failed: {e:#}")), + } +} + +fn cmd_stats(conn: &rusqlite::Connection) -> ExitCode { + match store::stats(conn) { + Ok(s) => { + println!( + "hits={} misses={} entries={} bytes={}", + s.hits, s.misses, s.entries, s.bytes + ); + ExitCode::SUCCESS + } + Err(e) => err(&format!("stats failed: {e:#}")), + } +} + +fn cmd_purge(conn: &rusqlite::Connection) -> ExitCode { + match store::purge(conn) { + Ok(n) => { + println!("purged={n}"); + ExitCode::SUCCESS + } + Err(e) => err(&format!("purge failed: {e:#}")), + } +} + +fn cmd_clear(conn: &rusqlite::Connection) -> ExitCode { + match store::clear(conn) { + Ok(n) => { + println!("cleared={n}"); + ExitCode::SUCCESS + } + Err(e) => err(&format!("clear failed: {e:#}")), + } +} + +fn main() -> ExitCode { + let cli = Cli::parse(); + let path = db_path(cli.db); + let conn = match store::open(&path) { + Ok(c) => c, + Err(e) => return err(&format!("open {}: {e:#}", path.display())), + }; + match cli.cmd { + Cmd::Wrap { atom_id, input, ttl, atoms_root: ar } => { + cmd_wrap(&conn, &atom_id, &input, ttl, atoms_root(ar)) + } + Cmd::Stats => cmd_stats(&conn), + Cmd::Purge => cmd_purge(&conn), + Cmd::Clear => cmd_clear(&conn), + } +} diff --git a/_primitives/_rust/kei-cache/src/store.rs b/_primitives/_rust/kei-cache/src/store.rs new file mode 100644 index 0000000..2aa7ca1 --- /dev/null +++ b/_primitives/_rust/kei-cache/src/store.rs @@ -0,0 +1,237 @@ +//! SQLite-backed cache store. +//! +//! Constructor Pattern: one cube = cache table DDL + put/get/stats/purge/clear. +//! Every fn <30 LOC. Schema is append-only migration list; expiry is +//! timestamp-based (`expires_ts = created_ts + ttl_sec`). +//! +//! Layout: one row per unique (atom_id, canonical_input) → cache key. +//! Payload stored as raw JSON text to keep the primitive format-neutral. + +use anyhow::{anyhow, Context, Result}; +use rusqlite::{params, Connection, OptionalExtension}; +use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Ordered migrations. Index = schema version. Never reorder; append only. +pub const MIGRATIONS: &[&str] = &[ + // v1 — initial schema (2026-04-23) + "CREATE TABLE IF NOT EXISTS cache ( + key TEXT PRIMARY KEY, + atom_id TEXT NOT NULL, + payload TEXT NOT NULL, + created_ts INTEGER NOT NULL, + expires_ts INTEGER NOT NULL, + bytes INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_ts); + CREATE INDEX IF NOT EXISTS idx_cache_atom ON cache(atom_id); + CREATE TABLE IF NOT EXISTS counters ( + name TEXT PRIMARY KEY, + value INTEGER NOT NULL + );", +]; + +/// Open or create the cache DB and run migrations. +pub fn open(path: &Path) -> Result { + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let conn = Connection::open(path).with_context(|| format!("open {}", path.display()))?; + migrate(&conn)?; + Ok(conn) +} + +/// Apply pending migrations atomically (DDL + user_version bump per txn). +fn migrate(conn: &Connection) -> Result<()> { + let current: i64 = conn + .query_row("PRAGMA user_version", [], |r| r.get(0)) + .unwrap_or(0); + for (i, sql) in MIGRATIONS.iter().enumerate() { + let target = (i + 1) as i64; + if current < target { + apply_one(conn, sql, target)?; + } + } + Ok(()) +} + +fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> { + conn.execute_batch("BEGIN IMMEDIATE")?; + let step = (|| -> rusqlite::Result<()> { + conn.execute_batch(sql)?; + conn.pragma_update(None, "user_version", target)?; + Ok(()) + })(); + match step { + Ok(()) => conn.execute_batch("COMMIT").map_err(Into::into), + Err(e) => { + let _ = conn.execute_batch("ROLLBACK"); + Err(anyhow!("migration v{target}: {e}")) + } + } +} + +/// Current unix timestamp in seconds. +pub fn now_ts() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} + +/// Insert (upsert) a cache entry. `ttl_sec` must be > 0. +pub fn put(conn: &Connection, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()> { + if ttl_sec <= 0 { + return Err(anyhow!("ttl must be positive, got {ttl_sec}")); + } + let now = now_ts(); + let expires = now + ttl_sec; + let bytes = payload.len() as i64; + conn.execute( + "INSERT INTO cache (key, atom_id, payload, created_ts, expires_ts, bytes) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) + ON CONFLICT(key) DO UPDATE SET + payload = excluded.payload, + created_ts = excluded.created_ts, + expires_ts = excluded.expires_ts, + bytes = excluded.bytes", + params![key, atom_id, payload, now, expires, bytes], + )?; + Ok(()) +} + +/// Look up a key; returns `None` on miss or expired entry. +/// Expired entries are evicted lazily on lookup. +pub fn get(conn: &Connection, key: &str) -> Result> { + let now = now_ts(); + let row: Option<(String, i64)> = conn + .query_row( + "SELECT payload, expires_ts FROM cache WHERE key = ?1", + params![key], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .optional()?; + match row { + Some((payload, expires)) if expires > now => Ok(Some(payload)), + Some(_) => { + let _ = conn.execute("DELETE FROM cache WHERE key = ?1", params![key]); + Ok(None) + } + None => Ok(None), + } +} + +/// Increment a named counter (hits / misses) by 1. +pub fn bump(conn: &Connection, name: &str) -> Result<()> { + conn.execute( + "INSERT INTO counters (name, value) VALUES (?1, 1) + ON CONFLICT(name) DO UPDATE SET value = value + 1", + params![name], + )?; + Ok(()) +} + +/// Read aggregate stats: (hits, misses, live_entries, total_bytes). +pub fn stats(conn: &Connection) -> Result { + let hits = counter(conn, "hits")?; + let misses = counter(conn, "misses")?; + let now = now_ts(); + let (entries, bytes): (i64, i64) = conn + .query_row( + "SELECT COUNT(*), COALESCE(SUM(bytes), 0) + FROM cache WHERE expires_ts > ?1", + params![now], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .unwrap_or((0, 0)); + Ok(Stats { hits, misses, entries, bytes }) +} + +fn counter(conn: &Connection, name: &str) -> Result { + let v: Option = conn + .query_row( + "SELECT value FROM counters WHERE name = ?1", + params![name], + |r| r.get(0), + ) + .optional()?; + Ok(v.unwrap_or(0)) +} + +/// Aggregate cache stats snapshot. +#[derive(Debug, Clone, serde::Serialize)] +pub struct Stats { + pub hits: i64, + pub misses: i64, + pub entries: i64, + pub bytes: i64, +} + +/// Evict expired rows; returns number deleted. +pub fn purge(conn: &Connection) -> Result { + let now = now_ts(); + let n = conn.execute("DELETE FROM cache WHERE expires_ts <= ?1", params![now])?; + Ok(n) +} + +/// Wipe everything (cache + counters). Returns rows removed from `cache`. +pub fn clear(conn: &Connection) -> Result { + let n = conn.execute("DELETE FROM cache", [])?; + conn.execute("DELETE FROM counters", [])?; + Ok(n) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + fn fresh() -> (tempfile::TempDir, Connection) { + let d = tempdir().unwrap(); + let p = d.path().join("c.sqlite"); + let c = open(&p).unwrap(); + (d, c) + } + + #[test] + fn put_get_roundtrip() { + let (_d, c) = fresh(); + put(&c, "k1", "atom:x", "{\"r\":1}", 60).unwrap(); + assert_eq!(get(&c, "k1").unwrap().as_deref(), Some("{\"r\":1}")); + } + + #[test] + fn miss_returns_none() { + let (_d, c) = fresh(); + assert!(get(&c, "missing").unwrap().is_none()); + } + + #[test] + fn purge_removes_expired() { + let (_d, c) = fresh(); + // ttl=1 means expires in 1s; manually backdate via direct update. + put(&c, "k1", "atom:x", "v", 60).unwrap(); + c.execute( + "UPDATE cache SET expires_ts = 1 WHERE key = 'k1'", + [], + ) + .unwrap(); + assert_eq!(purge(&c).unwrap(), 1); + assert!(get(&c, "k1").unwrap().is_none()); + } + + #[test] + fn stats_count_live_only() { + let (_d, c) = fresh(); + put(&c, "a", "atom:x", "xx", 60).unwrap(); + put(&c, "b", "atom:x", "yyyy", 60).unwrap(); + bump(&c, "hits").unwrap(); + bump(&c, "misses").unwrap(); + bump(&c, "misses").unwrap(); + let s = stats(&c).unwrap(); + assert_eq!(s.entries, 2); + assert_eq!(s.bytes, 6); + assert_eq!(s.hits, 1); + assert_eq!(s.misses, 2); + } +} diff --git a/_primitives/_rust/kei-cache/tests/cache_smoke.rs b/_primitives/_rust/kei-cache/tests/cache_smoke.rs new file mode 100644 index 0000000..eb03114 --- /dev/null +++ b/_primitives/_rust/kei-cache/tests/cache_smoke.rs @@ -0,0 +1,138 @@ +//! cache_smoke — end-to-end integration tests for `wrap_with`. +//! +//! Uses a `MockExecutor` that returns an incrementing counter so "was the +//! executor actually re-invoked?" is observable as a different return +//! value rather than inferred from a side-effect. + +use anyhow::{anyhow, Result}; +use kei_atom_discovery::AtomKind; +use kei_cache::exec::ensure_cacheable; +use kei_cache::{store, wrap_with, AtomExecutor, Outcome}; +use std::cell::Cell; +use tempfile::tempdir; + +/// Mock executor: each invocation returns `{"n": }`. +/// Simulates a timestamp-like observable so a repeated call with the same +/// input must be a cache-hit to produce the same value. +struct MockExecutor { + calls: Cell, + kind: AtomKind, +} + +impl MockExecutor { + fn new() -> Self { + Self { calls: Cell::new(0), kind: AtomKind::Query } + } + fn with_kind(kind: AtomKind) -> Self { + Self { calls: Cell::new(0), kind } + } +} + +impl AtomExecutor for MockExecutor { + fn execute(&self, atom_id: &str, _input_json: &str) -> Result { + ensure_cacheable(&self.kind, atom_id)?; + let n = self.calls.get() + 1; + self.calls.set(n); + Ok(format!("{{\"n\":{n}}}")) + } +} + +fn open_fresh_cache() -> (tempfile::TempDir, rusqlite::Connection) { + let d = tempdir().unwrap(); + let p = d.path().join("c.sqlite"); + let c = store::open(&p).unwrap(); + (d, c) +} + +#[test] +fn first_call_misses_and_stores() { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let (payload, outcome) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + assert_eq!(outcome, Outcome::Miss); + assert_eq!(payload, "{\"n\":1}"); + assert_eq!(ex.calls.get(), 1); +} + +#[test] +fn second_call_same_input_is_hit() { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let (p1, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + let (p2, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + assert_eq!(o1, Outcome::Miss); + assert_eq!(o2, Outcome::Hit); + // Same value both times → executor was NOT re-invoked on the hit. + assert_eq!(p1, p2); + assert_eq!(ex.calls.get(), 1); +} + +#[test] +fn equivalent_json_is_still_a_hit() { + // Whitespace + key ordering differ; canonical JSON must hash the same. + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let _ = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1,\"b\":2}", 60).unwrap(); + let (_, o2) = wrap_with(&conn, &ex, "atom:mock", " {\"b\":2,\"a\":1} ", 60).unwrap(); + assert_eq!(o2, Outcome::Hit); + assert_eq!(ex.calls.get(), 1); +} + +#[test] +fn different_input_misses_with_different_key() { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let (p1, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + let (p2, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":2}", 60).unwrap(); + assert_eq!(o1, Outcome::Miss); + assert_eq!(o2, Outcome::Miss); + // Counter advanced → executor really was re-invoked for the second input. + assert_ne!(p1, p2); + assert_eq!(ex.calls.get(), 2); +} + +#[test] +fn expired_entry_misses_even_for_same_input() { + // No sleep: put an entry, then force-expire via direct UPDATE. + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let (_, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + assert_eq!(o1, Outcome::Miss); + conn.execute("UPDATE cache SET expires_ts = 1", []).unwrap(); + let (_, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap(); + assert_eq!(o2, Outcome::Miss); + assert_eq!(ex.calls.get(), 2); +} + +#[test] +fn non_cacheable_kind_is_refused() { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::with_kind(AtomKind::Command); + let res = wrap_with(&conn, &ex, "atom:danger", "{}", 60); + assert!(res.is_err(), "command-kind atoms must not be cacheable"); + let msg = format!("{:#}", res.unwrap_err()); + assert!(msg.contains("unsafe to cache"), "unexpected error: {msg}"); + // Nothing stored on rejection. + let s = store::stats(&conn).unwrap(); + assert_eq!(s.entries, 0); +} + +#[test] +fn stream_kind_is_refused() { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::with_kind(AtomKind::Stream); + let err = wrap_with(&conn, &ex, "atom:s", "{}", 60).unwrap_err(); + assert!(format!("{err:#}").contains("unsafe to cache")); +} + +#[test] +fn invalid_json_input_errors_before_keying() -> Result<()> { + let (_d, conn) = open_fresh_cache(); + let ex = MockExecutor::new(); + let res = wrap_with(&conn, &ex, "atom:x", "not json", 60); + if res.is_ok() { + return Err(anyhow!("malformed JSON must not be accepted")); + } + assert_eq!(ex.calls.get(), 0); + Ok(()) +}