feat(r2): new kei-cache crate — deterministic result cache

Wraps pure (query/transform) atom invocations with SHA-256 keyed
cache. Refuses Command/Stream kind atoms as unsafe.

22/22 tests (14 unit + 8 integration). Canonical JSON keying
(formatting-drift safe). TTL expiry. AtomExecutor trait decouples
subprocess from test mocks.

Default DB ~/.claude/cache/cache.sqlite, overridable via --db or
$KEI_CACHE_DB.

Workspace Cargo.toml: +kei-cache member.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 05:55:13 +08:00
parent e075ae8df1
commit 76dcdc5c87
9 changed files with 913 additions and 0 deletions

View file

@ -1905,6 +1905,21 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-cache"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"kei-atom-discovery",
"rusqlite",
"serde",
"serde_json",
"sha2 0.10.9",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "kei-capability"
version = "0.1.0"

View file

@ -43,6 +43,8 @@ members = [
"kei-provision",
# Convergence Layer A — schema-driven verb-template engine for SQLite-CRUD stores
"kei-entity-store",
# v1 substrate — deterministic result cache for pure (query/transform) atoms
"kei-cache",
]
[workspace.package]

View file

@ -0,0 +1,27 @@
[package]
name = "kei-cache"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Atom result cache — deterministic wrapping of pure (query/transform) atom invocations"
[[bin]]
name = "kei-cache"
path = "src/main.rs"
[lib]
name = "kei_cache"
path = "src/lib.rs"
[dependencies]
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
anyhow = "1"
thiserror = "1"
kei-atom-discovery = { path = "../kei-atom-discovery" }
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,139 @@
//! Atom invocation on cache miss.
//!
//! Constructor Pattern: `AtomExecutor` trait = one-method contract
//! (atom_id + canonical input → JSON payload string). `SubprocessExecutor`
//! is the production impl — mirrors the kei-runtime binary-resolution
//! rules (`KEI_RUNTIME_BIN_DIR` → `$PATH`) and spawns
//! `<crate> run-atom <verb>` with the input on stdin.
//!
//! Kind-safety: before invoking we consult `kei-atom-discovery` to obtain
//! `AtomKind`. `command` and `stream` are refused ("unsafe to cache");
//! `query` and `transform` pass through.
use anyhow::{anyhow, Context, Result};
use kei_atom_discovery::{discover_atoms, AtomKind, AtomMeta};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
/// Strategy for invoking an atom after a cache miss.
///
/// Implementations MUST return the atom's raw JSON stdout as a String.
/// They MUST NOT perform any caching themselves.
pub trait AtomExecutor {
fn execute(&self, atom_id: &str, input_json: &str) -> Result<String>;
}
/// Production executor: resolves atom metadata via kei-atom-discovery,
/// refuses non-cacheable kinds, and spawns `<crate> run-atom <verb>`.
pub struct SubprocessExecutor {
atoms_root: PathBuf,
}
impl SubprocessExecutor {
pub fn new(atoms_root: impl Into<PathBuf>) -> Self {
Self { atoms_root: atoms_root.into() }
}
fn find_meta(&self, atom_id: &str) -> Result<AtomMeta> {
discover_atoms(&self.atoms_root)
.into_iter()
.find(|a| a.full_id == atom_id)
.ok_or_else(|| anyhow!("no atom matching `{atom_id}` under {}", self.atoms_root.display()))
}
}
impl AtomExecutor for SubprocessExecutor {
fn execute(&self, atom_id: &str, input_json: &str) -> Result<String> {
let meta = self.find_meta(atom_id)?;
ensure_cacheable(&meta.kind, atom_id)?;
run_subprocess(&meta, input_json)
}
}
/// Gate: only pure kinds may be cached. Command has side effects; stream is
/// incremental so caching the first frame would be misleading.
pub fn ensure_cacheable(kind: &AtomKind, atom_id: &str) -> Result<()> {
match kind {
AtomKind::Query | AtomKind::Transform => Ok(()),
AtomKind::Command => Err(anyhow!(
"atom `{atom_id}` has kind=command (side effects); unsafe to cache"
)),
AtomKind::Stream => Err(anyhow!(
"atom `{atom_id}` has kind=stream (incremental); unsafe to cache"
)),
}
}
/// Spawn `<crate> run-atom <verb>` with `input_json` on stdin; return stdout.
fn run_subprocess(meta: &AtomMeta, input_json: &str) -> Result<String> {
let bin = resolve_binary(&meta.crate_name)
.ok_or_else(|| anyhow!("binary `{}` not on PATH or KEI_RUNTIME_BIN_DIR", meta.crate_name))?;
let mut child = Command::new(&bin)
.arg("run-atom")
.arg(&meta.verb)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| format!("spawn {}", bin.display()))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(input_json.as_bytes())
.context("write stdin to atom subprocess")?;
}
let out = child.wait_with_output().context("wait on atom subprocess")?;
if !out.status.success() {
let code = out.status.code().unwrap_or(-1);
let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string();
return Err(anyhow!("atom `{}` exited {code}: {stderr}", meta.full_id));
}
Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
}
/// Resolve binary by name:
/// 1. `$KEI_RUNTIME_BIN_DIR/<crate>` when env var is set and file exists
/// 2. Walk `$PATH`, return first `<dir>/<crate>` that exists
pub fn resolve_binary(crate_name: &str) -> Option<PathBuf> {
if let Ok(dir) = std::env::var("KEI_RUNTIME_BIN_DIR") {
let candidate = PathBuf::from(dir).join(crate_name);
if candidate.is_file() {
return Some(candidate);
}
}
let path = std::env::var("PATH").ok()?;
for dir in std::env::split_paths(&path) {
let candidate: PathBuf = Path::new(&dir).join(crate_name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rejects_command_kind() {
let err = ensure_cacheable(&AtomKind::Command, "atom:x").unwrap_err();
assert!(err.to_string().contains("unsafe to cache"));
}
#[test]
fn rejects_stream_kind() {
let err = ensure_cacheable(&AtomKind::Stream, "atom:x").unwrap_err();
assert!(err.to_string().contains("unsafe to cache"));
}
#[test]
fn accepts_query_kind() {
ensure_cacheable(&AtomKind::Query, "atom:x").unwrap();
}
#[test]
fn accepts_transform_kind() {
ensure_cacheable(&AtomKind::Transform, "atom:x").unwrap();
}
}

View file

@ -0,0 +1,98 @@
//! Cache key derivation.
//!
//! Constructor Pattern: one cube = canonical JSON serialisation + SHA-256.
//! Key = SHA-256(atom_id || '\0' || canonical_json(input)).
//!
//! Canonical JSON: object keys sorted lexicographically at every depth, no
//! insignificant whitespace. Ensures semantically-identical inputs hash to
//! the same bytes regardless of source formatting.
use serde_json::{Map, Value};
use sha2::{Digest, Sha256};
/// Produce canonical JSON bytes: stable key order, minimal whitespace.
pub fn canonical_json(v: &Value) -> String {
let canon = canonicalise(v.clone());
serde_json::to_string(&canon).expect("canonical_json: serialise never fails for owned Value")
}
/// Recursively canonicalise: sort object keys at every nesting level.
fn canonicalise(v: Value) -> Value {
match v {
Value::Object(m) => {
let mut keys: Vec<String> = m.keys().cloned().collect();
keys.sort();
let mut out = Map::with_capacity(keys.len());
let mut src = m;
for k in keys {
if let Some(val) = src.remove(&k) {
out.insert(k, canonicalise(val));
}
}
Value::Object(out)
}
Value::Array(a) => Value::Array(a.into_iter().map(canonicalise).collect()),
other => other,
}
}
/// Compute cache key as 64-hex SHA-256 digest of (atom_id \0 canonical_json).
pub fn cache_key(atom_id: &str, input: &Value) -> String {
let mut hasher = Sha256::new();
hasher.update(atom_id.as_bytes());
hasher.update([0u8]);
hasher.update(canonical_json(input).as_bytes());
let digest = hasher.finalize();
hex_lower(&digest)
}
/// Hex-encode lowercase without pulling a separate crate.
fn hex_lower(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0f) as usize] as char);
}
s
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn canonical_sorts_keys() {
let a = json!({"z": 1, "a": 2, "m": {"y": 1, "b": 2}});
let b = json!({"a": 2, "m": {"b": 2, "y": 1}, "z": 1});
assert_eq!(canonical_json(&a), canonical_json(&b));
}
#[test]
fn key_stable_across_formatting() {
let a = json!({"x": 1, "y": [1, 2]});
let b: Value = serde_json::from_str(" {\"y\":[1,2],\"x\":1} ").unwrap();
assert_eq!(cache_key("atom:foo", &a), cache_key("atom:foo", &b));
}
#[test]
fn key_differs_by_input() {
let a = json!({"x": 1});
let b = json!({"x": 2});
assert_ne!(cache_key("atom:foo", &a), cache_key("atom:foo", &b));
}
#[test]
fn key_differs_by_atom_id() {
let v = json!({"x": 1});
assert_ne!(cache_key("atom:foo", &v), cache_key("atom:bar", &v));
}
#[test]
fn key_is_64_hex() {
let k = cache_key("atom:x", &json!({}));
assert_eq!(k.len(), 64);
assert!(k.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
}
}

View file

@ -0,0 +1,120 @@
//! kei-cache — deterministic caching primitive for pure atom invocations.
//!
//! Entry point is [`wrap_with`]: given a cache [`rusqlite::Connection`], an
//! [`exec::AtomExecutor`], an atom id, JSON input, and a TTL, either
//! return the cached payload or invoke the executor, store the result,
//! and return it.
//!
//! Key derivation lives in [`key`]. Storage lives in [`store`]. Invocation
//! on miss lives in [`exec`]. `lib.rs` only composes them — it owns no
//! persistent state.
pub mod exec;
pub mod key;
pub mod store;
use anyhow::{Context, Result};
use rusqlite::Connection;
use serde_json::Value;
pub use exec::{AtomExecutor, SubprocessExecutor};
pub use store::Stats;
/// Outcome of a [`wrap_with`] call.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
Hit,
Miss,
}
impl Outcome {
pub fn as_str(&self) -> &'static str {
match self {
Outcome::Hit => "hit",
Outcome::Miss => "miss",
}
}
}
/// Cache trait — library-level API for downstream consumers.
///
/// Production impl is [`SqliteCache`]. Tests may provide in-memory impls.
pub trait Cache {
/// Fetch from the cache; `None` if absent or expired.
fn get(&self, key: &str) -> Result<Option<String>>;
/// Store `payload` under `key` with TTL (seconds).
fn put(&self, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()>;
}
/// SQLite-backed cache impl. Holds a borrowed [`Connection`].
pub struct SqliteCache<'c> {
pub conn: &'c Connection,
}
impl<'c> Cache for SqliteCache<'c> {
fn get(&self, key: &str) -> Result<Option<String>> {
store::get(self.conn, key)
}
fn put(&self, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()> {
store::put(self.conn, key, atom_id, payload, ttl_sec)
}
}
/// Top-level wrap: lookup → return on hit, invoke + store on miss.
///
/// Returns `(payload_string, outcome)`. `payload_string` is the atom's
/// JSON stdout verbatim (trimmed). `outcome` distinguishes hit vs miss
/// so the CLI can emit `cache=hit|miss` to stderr.
pub fn wrap_with<E: AtomExecutor>(
conn: &Connection,
executor: &E,
atom_id: &str,
input_json: &str,
ttl_sec: i64,
) -> Result<(String, Outcome)> {
let input: Value =
serde_json::from_str(input_json).with_context(|| "input is not valid JSON")?;
let key = key::cache_key(atom_id, &input);
if let Some(payload) = store::get(conn, &key)? {
let _ = store::bump(conn, "hits");
return Ok((payload, Outcome::Hit));
}
let payload = executor
.execute(atom_id, input_json)
.with_context(|| format!("execute atom `{atom_id}`"))?;
store::put(conn, &key, atom_id, &payload, ttl_sec)?;
let _ = store::bump(conn, "misses");
Ok((payload, Outcome::Miss))
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
use tempfile::tempdir;
struct CountingExec {
calls: Cell<u32>,
reply: String,
}
impl AtomExecutor for CountingExec {
fn execute(&self, _atom_id: &str, _input_json: &str) -> Result<String> {
self.calls.set(self.calls.get() + 1);
Ok(self.reply.clone())
}
}
#[test]
fn hit_skips_executor() {
let d = tempdir().unwrap();
let p = d.path().join("c.sqlite");
let conn = store::open(&p).unwrap();
let ex = CountingExec { calls: Cell::new(0), reply: "{\"r\":1}".into() };
let (_, o1) = wrap_with(&conn, &ex, "atom:x", "{\"a\":1}", 60).unwrap();
let (_, o2) = wrap_with(&conn, &ex, "atom:x", "{\"a\":1}", 60).unwrap();
assert_eq!(o1, Outcome::Miss);
assert_eq!(o2, Outcome::Hit);
assert_eq!(ex.calls.get(), 1);
}
}

View file

@ -0,0 +1,137 @@
//! kei-cache CLI dispatcher.
//!
//! Constructor Pattern: single cube = arg parsing + dispatch + formatting.
//! Storage: `~/.claude/cache/cache.sqlite` (or `$KEI_CACHE_DB` override).
use clap::{Parser, Subcommand};
use kei_cache::{store, wrap_with, Outcome, SubprocessExecutor};
use std::path::PathBuf;
use std::process::ExitCode;
#[derive(Parser)]
#[command(name = "kei-cache", version, about = "Atom result cache")]
struct Cli {
/// Override cache DB path (default: $KEI_CACHE_DB or ~/.claude/cache/cache.sqlite)
#[arg(long)]
db: Option<PathBuf>,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Wrap an atom invocation with deterministic caching.
Wrap {
/// Atom id (e.g. `kei-router:route`).
atom_id: String,
/// JSON-string input to hash + forward on miss.
#[arg(long)]
input: String,
/// TTL in seconds (default: 3600).
#[arg(long, default_value_t = 3600)]
ttl: i64,
/// Atoms-root for discovery (default: $KEI_ATOMS_ROOT or cwd).
#[arg(long)]
atoms_root: Option<PathBuf>,
},
/// Print hit/miss + live entry counts.
Stats,
/// Evict all expired entries.
Purge,
/// Wipe cache + counters.
Clear,
}
fn db_path(cli_db: Option<PathBuf>) -> PathBuf {
if let Some(p) = cli_db {
return p;
}
if let Ok(env) = std::env::var("KEI_CACHE_DB") {
return PathBuf::from(env);
}
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
PathBuf::from(home).join(".claude/cache/cache.sqlite")
}
fn atoms_root(flag: Option<PathBuf>) -> PathBuf {
flag.or_else(|| std::env::var("KEI_ATOMS_ROOT").ok().map(PathBuf::from))
.unwrap_or_else(|| PathBuf::from("."))
}
fn err(msg: &str) -> ExitCode {
eprintln!("kei-cache: {msg}");
ExitCode::from(1)
}
fn cmd_wrap(
conn: &rusqlite::Connection,
atom_id: &str,
input: &str,
ttl: i64,
atoms_root: PathBuf,
) -> ExitCode {
let executor = SubprocessExecutor::new(atoms_root);
match wrap_with(conn, &executor, atom_id, input, ttl) {
Ok((payload, Outcome::Hit)) => {
eprintln!("cache=hit");
println!("{payload}");
ExitCode::SUCCESS
}
Ok((payload, Outcome::Miss)) => {
eprintln!("cache=miss");
println!("{payload}");
ExitCode::SUCCESS
}
Err(e) => err(&format!("wrap failed: {e:#}")),
}
}
fn cmd_stats(conn: &rusqlite::Connection) -> ExitCode {
match store::stats(conn) {
Ok(s) => {
println!(
"hits={} misses={} entries={} bytes={}",
s.hits, s.misses, s.entries, s.bytes
);
ExitCode::SUCCESS
}
Err(e) => err(&format!("stats failed: {e:#}")),
}
}
fn cmd_purge(conn: &rusqlite::Connection) -> ExitCode {
match store::purge(conn) {
Ok(n) => {
println!("purged={n}");
ExitCode::SUCCESS
}
Err(e) => err(&format!("purge failed: {e:#}")),
}
}
fn cmd_clear(conn: &rusqlite::Connection) -> ExitCode {
match store::clear(conn) {
Ok(n) => {
println!("cleared={n}");
ExitCode::SUCCESS
}
Err(e) => err(&format!("clear failed: {e:#}")),
}
}
fn main() -> ExitCode {
let cli = Cli::parse();
let path = db_path(cli.db);
let conn = match store::open(&path) {
Ok(c) => c,
Err(e) => return err(&format!("open {}: {e:#}", path.display())),
};
match cli.cmd {
Cmd::Wrap { atom_id, input, ttl, atoms_root: ar } => {
cmd_wrap(&conn, &atom_id, &input, ttl, atoms_root(ar))
}
Cmd::Stats => cmd_stats(&conn),
Cmd::Purge => cmd_purge(&conn),
Cmd::Clear => cmd_clear(&conn),
}
}

View file

@ -0,0 +1,237 @@
//! SQLite-backed cache store.
//!
//! Constructor Pattern: one cube = cache table DDL + put/get/stats/purge/clear.
//! Every fn <30 LOC. Schema is append-only migration list; expiry is
//! timestamp-based (`expires_ts = created_ts + ttl_sec`).
//!
//! Layout: one row per unique (atom_id, canonical_input) → cache key.
//! Payload stored as raw JSON text to keep the primitive format-neutral.
use anyhow::{anyhow, Context, Result};
use rusqlite::{params, Connection, OptionalExtension};
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
/// Ordered migrations. Index = schema version. Never reorder; append only.
pub const MIGRATIONS: &[&str] = &[
// v1 — initial schema (2026-04-23)
"CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
atom_id TEXT NOT NULL,
payload TEXT NOT NULL,
created_ts INTEGER NOT NULL,
expires_ts INTEGER NOT NULL,
bytes INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_ts);
CREATE INDEX IF NOT EXISTS idx_cache_atom ON cache(atom_id);
CREATE TABLE IF NOT EXISTS counters (
name TEXT PRIMARY KEY,
value INTEGER NOT NULL
);",
];
/// Open or create the cache DB and run migrations.
pub fn open(path: &Path) -> Result<Connection> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path).with_context(|| format!("open {}", path.display()))?;
migrate(&conn)?;
Ok(conn)
}
/// Apply pending migrations atomically (DDL + user_version bump per txn).
fn migrate(conn: &Connection) -> Result<()> {
let current: i64 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
for (i, sql) in MIGRATIONS.iter().enumerate() {
let target = (i + 1) as i64;
if current < target {
apply_one(conn, sql, target)?;
}
}
Ok(())
}
fn apply_one(conn: &Connection, sql: &str, target: i64) -> Result<()> {
conn.execute_batch("BEGIN IMMEDIATE")?;
let step = (|| -> rusqlite::Result<()> {
conn.execute_batch(sql)?;
conn.pragma_update(None, "user_version", target)?;
Ok(())
})();
match step {
Ok(()) => conn.execute_batch("COMMIT").map_err(Into::into),
Err(e) => {
let _ = conn.execute_batch("ROLLBACK");
Err(anyhow!("migration v{target}: {e}"))
}
}
}
/// Current unix timestamp in seconds.
pub fn now_ts() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
/// Insert (upsert) a cache entry. `ttl_sec` must be > 0.
pub fn put(conn: &Connection, key: &str, atom_id: &str, payload: &str, ttl_sec: i64) -> Result<()> {
if ttl_sec <= 0 {
return Err(anyhow!("ttl must be positive, got {ttl_sec}"));
}
let now = now_ts();
let expires = now + ttl_sec;
let bytes = payload.len() as i64;
conn.execute(
"INSERT INTO cache (key, atom_id, payload, created_ts, expires_ts, bytes)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(key) DO UPDATE SET
payload = excluded.payload,
created_ts = excluded.created_ts,
expires_ts = excluded.expires_ts,
bytes = excluded.bytes",
params![key, atom_id, payload, now, expires, bytes],
)?;
Ok(())
}
/// Look up a key; returns `None` on miss or expired entry.
/// Expired entries are evicted lazily on lookup.
pub fn get(conn: &Connection, key: &str) -> Result<Option<String>> {
let now = now_ts();
let row: Option<(String, i64)> = conn
.query_row(
"SELECT payload, expires_ts FROM cache WHERE key = ?1",
params![key],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()?;
match row {
Some((payload, expires)) if expires > now => Ok(Some(payload)),
Some(_) => {
let _ = conn.execute("DELETE FROM cache WHERE key = ?1", params![key]);
Ok(None)
}
None => Ok(None),
}
}
/// Increment a named counter (hits / misses) by 1.
pub fn bump(conn: &Connection, name: &str) -> Result<()> {
conn.execute(
"INSERT INTO counters (name, value) VALUES (?1, 1)
ON CONFLICT(name) DO UPDATE SET value = value + 1",
params![name],
)?;
Ok(())
}
/// Read aggregate stats: (hits, misses, live_entries, total_bytes).
pub fn stats(conn: &Connection) -> Result<Stats> {
let hits = counter(conn, "hits")?;
let misses = counter(conn, "misses")?;
let now = now_ts();
let (entries, bytes): (i64, i64) = conn
.query_row(
"SELECT COUNT(*), COALESCE(SUM(bytes), 0)
FROM cache WHERE expires_ts > ?1",
params![now],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap_or((0, 0));
Ok(Stats { hits, misses, entries, bytes })
}
fn counter(conn: &Connection, name: &str) -> Result<i64> {
let v: Option<i64> = conn
.query_row(
"SELECT value FROM counters WHERE name = ?1",
params![name],
|r| r.get(0),
)
.optional()?;
Ok(v.unwrap_or(0))
}
/// Aggregate cache stats snapshot.
#[derive(Debug, Clone, serde::Serialize)]
pub struct Stats {
pub hits: i64,
pub misses: i64,
pub entries: i64,
pub bytes: i64,
}
/// Evict expired rows; returns number deleted.
pub fn purge(conn: &Connection) -> Result<usize> {
let now = now_ts();
let n = conn.execute("DELETE FROM cache WHERE expires_ts <= ?1", params![now])?;
Ok(n)
}
/// Wipe everything (cache + counters). Returns rows removed from `cache`.
pub fn clear(conn: &Connection) -> Result<usize> {
let n = conn.execute("DELETE FROM cache", [])?;
conn.execute("DELETE FROM counters", [])?;
Ok(n)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn fresh() -> (tempfile::TempDir, Connection) {
let d = tempdir().unwrap();
let p = d.path().join("c.sqlite");
let c = open(&p).unwrap();
(d, c)
}
#[test]
fn put_get_roundtrip() {
let (_d, c) = fresh();
put(&c, "k1", "atom:x", "{\"r\":1}", 60).unwrap();
assert_eq!(get(&c, "k1").unwrap().as_deref(), Some("{\"r\":1}"));
}
#[test]
fn miss_returns_none() {
let (_d, c) = fresh();
assert!(get(&c, "missing").unwrap().is_none());
}
#[test]
fn purge_removes_expired() {
let (_d, c) = fresh();
// ttl=1 means expires in 1s; manually backdate via direct update.
put(&c, "k1", "atom:x", "v", 60).unwrap();
c.execute(
"UPDATE cache SET expires_ts = 1 WHERE key = 'k1'",
[],
)
.unwrap();
assert_eq!(purge(&c).unwrap(), 1);
assert!(get(&c, "k1").unwrap().is_none());
}
#[test]
fn stats_count_live_only() {
let (_d, c) = fresh();
put(&c, "a", "atom:x", "xx", 60).unwrap();
put(&c, "b", "atom:x", "yyyy", 60).unwrap();
bump(&c, "hits").unwrap();
bump(&c, "misses").unwrap();
bump(&c, "misses").unwrap();
let s = stats(&c).unwrap();
assert_eq!(s.entries, 2);
assert_eq!(s.bytes, 6);
assert_eq!(s.hits, 1);
assert_eq!(s.misses, 2);
}
}

View file

@ -0,0 +1,138 @@
//! cache_smoke — end-to-end integration tests for `wrap_with`.
//!
//! Uses a `MockExecutor` that returns an incrementing counter so "was the
//! executor actually re-invoked?" is observable as a different return
//! value rather than inferred from a side-effect.
use anyhow::{anyhow, Result};
use kei_atom_discovery::AtomKind;
use kei_cache::exec::ensure_cacheable;
use kei_cache::{store, wrap_with, AtomExecutor, Outcome};
use std::cell::Cell;
use tempfile::tempdir;
/// Mock executor: each invocation returns `{"n": <call_count>}`.
/// Simulates a timestamp-like observable so a repeated call with the same
/// input must be a cache-hit to produce the same value.
struct MockExecutor {
calls: Cell<u32>,
kind: AtomKind,
}
impl MockExecutor {
fn new() -> Self {
Self { calls: Cell::new(0), kind: AtomKind::Query }
}
fn with_kind(kind: AtomKind) -> Self {
Self { calls: Cell::new(0), kind }
}
}
impl AtomExecutor for MockExecutor {
fn execute(&self, atom_id: &str, _input_json: &str) -> Result<String> {
ensure_cacheable(&self.kind, atom_id)?;
let n = self.calls.get() + 1;
self.calls.set(n);
Ok(format!("{{\"n\":{n}}}"))
}
}
fn open_fresh_cache() -> (tempfile::TempDir, rusqlite::Connection) {
let d = tempdir().unwrap();
let p = d.path().join("c.sqlite");
let c = store::open(&p).unwrap();
(d, c)
}
#[test]
fn first_call_misses_and_stores() {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let (payload, outcome) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
assert_eq!(outcome, Outcome::Miss);
assert_eq!(payload, "{\"n\":1}");
assert_eq!(ex.calls.get(), 1);
}
#[test]
fn second_call_same_input_is_hit() {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let (p1, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
let (p2, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
assert_eq!(o1, Outcome::Miss);
assert_eq!(o2, Outcome::Hit);
// Same value both times → executor was NOT re-invoked on the hit.
assert_eq!(p1, p2);
assert_eq!(ex.calls.get(), 1);
}
#[test]
fn equivalent_json_is_still_a_hit() {
// Whitespace + key ordering differ; canonical JSON must hash the same.
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let _ = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1,\"b\":2}", 60).unwrap();
let (_, o2) = wrap_with(&conn, &ex, "atom:mock", " {\"b\":2,\"a\":1} ", 60).unwrap();
assert_eq!(o2, Outcome::Hit);
assert_eq!(ex.calls.get(), 1);
}
#[test]
fn different_input_misses_with_different_key() {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let (p1, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
let (p2, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":2}", 60).unwrap();
assert_eq!(o1, Outcome::Miss);
assert_eq!(o2, Outcome::Miss);
// Counter advanced → executor really was re-invoked for the second input.
assert_ne!(p1, p2);
assert_eq!(ex.calls.get(), 2);
}
#[test]
fn expired_entry_misses_even_for_same_input() {
// No sleep: put an entry, then force-expire via direct UPDATE.
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let (_, o1) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
assert_eq!(o1, Outcome::Miss);
conn.execute("UPDATE cache SET expires_ts = 1", []).unwrap();
let (_, o2) = wrap_with(&conn, &ex, "atom:mock", "{\"a\":1}", 60).unwrap();
assert_eq!(o2, Outcome::Miss);
assert_eq!(ex.calls.get(), 2);
}
#[test]
fn non_cacheable_kind_is_refused() {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::with_kind(AtomKind::Command);
let res = wrap_with(&conn, &ex, "atom:danger", "{}", 60);
assert!(res.is_err(), "command-kind atoms must not be cacheable");
let msg = format!("{:#}", res.unwrap_err());
assert!(msg.contains("unsafe to cache"), "unexpected error: {msg}");
// Nothing stored on rejection.
let s = store::stats(&conn).unwrap();
assert_eq!(s.entries, 0);
}
#[test]
fn stream_kind_is_refused() {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::with_kind(AtomKind::Stream);
let err = wrap_with(&conn, &ex, "atom:s", "{}", 60).unwrap_err();
assert!(format!("{err:#}").contains("unsafe to cache"));
}
#[test]
fn invalid_json_input_errors_before_keying() -> Result<()> {
let (_d, conn) = open_fresh_cache();
let ex = MockExecutor::new();
let res = wrap_with(&conn, &ex, "atom:x", "not json", 60);
if res.is_ok() {
return Err(anyhow!("malformed JSON must not be accepted"));
}
assert_eq!(ex.calls.get(), 0);
Ok(())
}