From 4a9dd98fde26111fdf56921f90962c5bad4dee17 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Thu, 23 Apr 2026 14:26:11 +0800 Subject: [PATCH] feat(p-pipe-cache): wire kei-cache into kei-pipe DAG executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optional per-step and DAG-level cache config in dag.toml: [[steps]] cache = { enabled = true, ttl_sec = 3600 } OR [pipe] cache = { enabled = true, ttl_sec = 3600 } Cache gated by AtomKind — only query/transform cacheable; command/stream always re-invoke even with cache.enabled=true. StepReport.source: Some('cache'|'fresh') | None shows cache outcome. Constructor Pattern: extracted src/config.rs (CacheConfig + StepKind + TOML raw types + split_pipe_cache parser) + src/topo.rs (topo-sort) to keep dag.rs under 200 LOC. Tests: 8/8 (was 5, +3: cache-hit reuse, cache-disabled always invokes, command-kind not cached even if enabled). kei-cache 22/22 preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- _primitives/_rust/Cargo.lock | 2 + _primitives/_rust/kei-pipe/Cargo.toml | 2 + _primitives/_rust/kei-pipe/src/config.rs | 82 ++++++++++ _primitives/_rust/kei-pipe/src/dag.rs | 126 ++++------------ _primitives/_rust/kei-pipe/src/exec.rs | 69 +++++++++ _primitives/_rust/kei-pipe/src/lib.rs | 87 ++++++++++- _primitives/_rust/kei-pipe/src/report.rs | 14 ++ _primitives/_rust/kei-pipe/src/topo.rs | 98 ++++++++++++ .../_rust/kei-pipe/tests/pipe_smoke.rs | 141 ++++++++++++++++++ 9 files changed, 518 insertions(+), 103 deletions(-) create mode 100644 _primitives/_rust/kei-pipe/src/config.rs create mode 100644 _primitives/_rust/kei-pipe/src/topo.rs diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 380efe3..40fad4b 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -2098,6 +2098,8 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "kei-cache", + "rusqlite", "serde", "serde_json", "tempfile", diff --git a/_primitives/_rust/kei-pipe/Cargo.toml b/_primitives/_rust/kei-pipe/Cargo.toml index 419ee29..ce46ba8 100644 --- a/_primitives/_rust/kei-pipe/Cargo.toml +++ b/_primitives/_rust/kei-pipe/Cargo.toml @@ -20,6 +20,8 @@ serde_json = "1" toml = "0.8" anyhow = "1" thiserror = "1" +kei-cache = { path = "../kei-cache" } +rusqlite = { version = "0.31", features = ["bundled"] } [dev-dependencies] tempfile = "3" diff --git a/_primitives/_rust/kei-pipe/src/config.rs b/_primitives/_rust/kei-pipe/src/config.rs new file mode 100644 index 0000000..1177e94 --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/config.rs @@ -0,0 +1,82 @@ +//! Per-step and DAG-level cache configuration types + TOML parsers. +//! +//! Kept separate from `dag.rs` so the core DAG cube stays under the 200-LOC +//! Constructor Pattern limit. Everything here is a pure value type or a +//! small string-validation helper — no I/O, no side effects. + +use serde::Deserialize; + +use crate::dag::DagError; + +/// Per-step or DAG-level cache opt-in. Both fields required when present. +#[derive(Debug, Clone, Copy)] +pub struct CacheConfig { + pub enabled: bool, + pub ttl_sec: i64, +} + +/// Atom kind as declared in the DAG. Only `Query` and `Transform` are +/// cacheable (pure); `Command` and `Stream` bypass the cache gate. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StepKind { + Query, + Transform, + Command, + Stream, +} + +impl StepKind { + pub fn is_cacheable(self) -> bool { + matches!(self, StepKind::Query | StepKind::Transform) + } +} + +/// Internal TOML surface for the `[pipe]` block. +#[derive(Debug, Deserialize, Default)] +pub(crate) struct RawPipe { + #[serde(default)] + pub cache: Option, +} + +/// Internal TOML surface for per-step or DAG-level `cache = { ... }`. +#[derive(Debug, Deserialize, Default)] +pub(crate) struct RawCache { + #[serde(default)] + pub enabled: Option, + #[serde(default, rename = "ttl_sec")] + pub ttl_sec: Option, + #[serde(default)] + pub db: Option, +} + +impl RawCache { + /// Flatten the TOML view into the public [`CacheConfig`] shape. `db` + /// is dropped — the caller reads it separately for DAG-level routing. + pub(crate) fn into_config(self) -> CacheConfig { + CacheConfig { + enabled: self.enabled.unwrap_or(false), + ttl_sec: self.ttl_sec.unwrap_or(0), + } + } +} + +/// Split the optional `[pipe]` block into `(cache_config, cache_db_path)`. +pub(crate) fn split_pipe_cache( + raw: Option, +) -> (Option, Option) { + let Some(p) = raw else { return (None, None); }; + let Some(c) = p.cache else { return (None, None); }; + let db = c.db.clone(); + (Some(c.into_config()), db) +} + +/// Parse a `kind = "..."` string into a typed [`StepKind`]. +pub(crate) fn parse_kind(step_id: &str, s: &str) -> Result { + match s { + "query" => Ok(StepKind::Query), + "transform" => Ok(StepKind::Transform), + "command" => Ok(StepKind::Command), + "stream" => Ok(StepKind::Stream), + other => Err(DagError::BadKind(step_id.into(), other.into())), + } +} diff --git a/_primitives/_rust/kei-pipe/src/dag.rs b/_primitives/_rust/kei-pipe/src/dag.rs index 75bfaec..c3f702a 100644 --- a/_primitives/_rust/kei-pipe/src/dag.rs +++ b/_primitives/_rust/kei-pipe/src/dag.rs @@ -1,7 +1,9 @@ //! DAG spec parsing + topological sort. //! //! TOML shape — `[[steps]]` array with fields `id`, `atom`, optional -//! `depends-on = [ids...]`, optional `input = { ... }`. +//! `depends-on = [ids...]`, optional `input = { ... }`. Optional per-step +//! `kind = "query|transform|command|stream"` and `cache = { enabled, ttl_sec }`. +//! Optional DAG-level `[pipe] cache = { enabled, ttl_sec, db = "..." }`. //! //! Invariants: //! - `id` and `atom` must be non-empty strings @@ -11,7 +13,9 @@ use serde::Deserialize; use serde_json::Value; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::HashSet; + +pub use crate::topo::topo_sort; /// Error cases raised while parsing or sorting a DAG. #[derive(Debug, thiserror::Error)] @@ -28,8 +32,13 @@ pub enum DagError { Cycle(String), #[error("input for step `{0}` must be a TOML table")] BadInput(String), + #[error("step `{0}` has invalid kind `{1}` (expected query|transform|command|stream)")] + BadKind(String, String), } +pub use crate::config::{CacheConfig, StepKind}; +use crate::config::{parse_kind, split_pipe_cache, RawCache, RawPipe}; + /// One atom invocation in a DAG. `input` is retained as `serde_json::Value` /// so the resolver can walk it uniformly (strings, objects, arrays). #[derive(Debug, Clone)] @@ -38,13 +47,18 @@ pub struct Step { pub atom: String, pub depends_on: Vec, pub input: Value, + pub kind: Option, + pub cache: Option, } /// Parsed DAG. `steps` preserves declaration order so error messages line -/// up with the TOML source. +/// up with the TOML source. `cache` is the DAG-level default applied to +/// any cacheable step that lacks its own `cache` override. #[derive(Debug, Clone, Default)] pub struct DagSpec { pub steps: Vec, + pub cache: Option, + pub cache_db: Option, } /// Internal TOML surface — kept private so callers only see the cleaned @@ -53,6 +67,8 @@ pub struct DagSpec { struct RawDag { #[serde(default)] steps: Vec, + #[serde(default)] + pipe: Option, } #[derive(Debug, Deserialize)] @@ -63,6 +79,10 @@ struct RawStep { depends_on: Vec, #[serde(default)] input: Option, + #[serde(default)] + kind: Option, + #[serde(default)] + cache: Option, } /// Parse TOML text into a cleaned `DagSpec` with per-step validation. @@ -74,7 +94,8 @@ pub fn parse_dag(text: &str) -> Result { let step = build_step(idx, rs, &mut seen)?; steps.push(step); } - Ok(DagSpec { steps }) + let (cache, cache_db) = split_pipe_cache(raw.pipe); + Ok(DagSpec { steps, cache, cache_db }) } fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet) -> Result { @@ -90,7 +111,12 @@ fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet) -> Result None, + Some(s) => Some(parse_kind(&id, &s)?), + }; + let cache = rs.cache.map(|c| c.into_config()); + Ok(Step { id, atom, depends_on: rs.depends_on, input, kind, cache }) } fn normalize_input(id: &str, raw: Option) -> Result { @@ -102,93 +128,3 @@ fn normalize_input(id: &str, raw: Option) -> Result Result, DagError> { - let index_by_id = index_by_id(spec)?; - validate_edges(spec, &index_by_id)?; - let (in_deg, adj) = build_graph(spec, &index_by_id); - let ordered_indices = kahn_sort(spec, in_deg, adj)?; - Ok(ordered_indices.iter().map(|i| &spec.steps[*i]).collect()) -} - -fn index_by_id(spec: &DagSpec) -> Result, DagError> { - let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len()); - for (i, s) in spec.steps.iter().enumerate() { - m.insert(s.id.as_str(), i); - } - Ok(m) -} - -fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> { - for s in &spec.steps { - for dep in &s.depends_on { - if !idx.contains_key(dep.as_str()) { - return Err(DagError::UnknownDep(s.id.clone(), dep.clone())); - } - } - } - Ok(()) -} - -fn build_graph( - spec: &DagSpec, - idx: &HashMap<&str, usize>, -) -> (Vec, Vec>) { - let n = spec.steps.len(); - let mut in_deg = vec![0usize; n]; - let mut adj: Vec> = vec![Vec::new(); n]; - for (i, s) in spec.steps.iter().enumerate() { - for dep in &s.depends_on { - let src = idx[dep.as_str()]; - adj[src].push(i); - in_deg[i] += 1; - } - } - (in_deg, adj) -} - -fn kahn_sort( - spec: &DagSpec, - mut in_deg: Vec, - adj: Vec>, -) -> Result, DagError> { - let n = spec.steps.len(); - let mut ready: BTreeMap = BTreeMap::new(); - seed_ready(&in_deg, &mut ready); - let mut out: Vec = Vec::with_capacity(n); - while let Some((&i, _)) = ready.iter().next() { - ready.remove(&i); - out.push(i); - for &j in &adj[i] { - in_deg[j] -= 1; - if in_deg[j] == 0 { - ready.insert(j, ()); - } - } - } - if out.len() != n { - return Err(DagError::Cycle(unresolved_ids(spec, &out))); - } - Ok(out) -} - -/// Seed the `ready` set with every node whose in-degree is zero. -fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap) { - for (i, deg) in in_deg.iter().enumerate() { - if *deg == 0 { - ready.insert(i, ()); - } - } -} - -fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String { - spec.steps - .iter() - .enumerate() - .filter(|(i, _)| !resolved.contains(i)) - .map(|(_, s)| s.id.as_str()) - .collect::>() - .join(", ") -} diff --git a/_primitives/_rust/kei-pipe/src/exec.rs b/_primitives/_rust/kei-pipe/src/exec.rs index 039894b..2b370d0 100644 --- a/_primitives/_rust/kei-pipe/src/exec.rs +++ b/_primitives/_rust/kei-pipe/src/exec.rs @@ -29,6 +29,8 @@ pub enum ExecError { OutputParse { atom: String, err: String, stdout: String }, #[error("serialize input: {0}")] InputSerialize(String), + #[error("cache error: {0}")] + Cache(String), } /// Parse an atom id into `(crate, verb)`. Rejects empty halves. @@ -51,6 +53,73 @@ pub fn run_atom(atom: &str, input: &Value) -> Result { parse_output(atom, output) } +/// Outcome label accompanying a cache-aware invocation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CacheOutcome { + /// Returned from the cache; atom was NOT invoked. + Hit, + /// Cache miss; atom was invoked and the result stored. + Fresh, +} + +impl CacheOutcome { + pub fn as_str(self) -> &'static str { + match self { + CacheOutcome::Hit => "cache", + CacheOutcome::Fresh => "fresh", + } + } +} + +/// Cache-aware atom invocation. On hit returns cached JSON; on miss calls +/// [`run_atom`], stores the serialised result under the computed key with +/// `ttl_sec`, and returns `Fresh`. Cache I/O errors are surfaced via +/// [`ExecError::Cache`] so the caller can distinguish from atom failures. +pub fn run_atom_cached( + conn: &rusqlite::Connection, + atom: &str, + input: &Value, + ttl_sec: i64, +) -> Result<(Value, CacheOutcome), ExecError> { + let key = kei_cache::key::cache_key(atom, input); + let hit = kei_cache::store::get(conn, &key).map_err(|e| ExecError::Cache(e.to_string()))?; + match hit { + Some(payload) => load_hit(conn, atom, payload), + None => load_miss(conn, atom, input, &key, ttl_sec), + } +} + +fn load_hit( + conn: &rusqlite::Connection, + atom: &str, + payload: String, +) -> Result<(Value, CacheOutcome), ExecError> { + let _ = kei_cache::store::bump(conn, "hits"); + let value: Value = + serde_json::from_str(&payload).map_err(|e| ExecError::OutputParse { + atom: atom.into(), + err: e.to_string(), + stdout: payload, + })?; + Ok((value, CacheOutcome::Hit)) +} + +fn load_miss( + conn: &rusqlite::Connection, + atom: &str, + input: &Value, + key: &str, + ttl_sec: i64, +) -> Result<(Value, CacheOutcome), ExecError> { + let result = run_atom(atom, input)?; + let payload = + serde_json::to_string(&result).map_err(|e| ExecError::InputSerialize(e.to_string()))?; + kei_cache::store::put(conn, key, atom, &payload, ttl_sec) + .map_err(|e| ExecError::Cache(e.to_string()))?; + let _ = kei_cache::store::bump(conn, "misses"); + Ok((result, CacheOutcome::Fresh)) +} + fn spawn_and_wait( bin: &PathBuf, verb: &str, diff --git a/_primitives/_rust/kei-pipe/src/lib.rs b/_primitives/_rust/kei-pipe/src/lib.rs index 882853e..2de5843 100644 --- a/_primitives/_rust/kei-pipe/src/lib.rs +++ b/_primitives/_rust/kei-pipe/src/lib.rs @@ -17,15 +17,17 @@ //! - [`report::DagReport`] / [`report::StepReport`] — run outcome //! - [`run_dag`] / [`validate_dag`] — top-level entry points +pub mod config; pub mod dag; pub mod exec; pub mod report; pub mod resolve; +pub mod topo; -use std::path::Path; +use std::path::{Path, PathBuf}; -use crate::dag::{parse_dag, topo_sort, DagError}; -use crate::exec::{run_atom, ExecError}; +use crate::dag::{parse_dag, topo_sort, CacheConfig, DagError, DagSpec, Step}; +use crate::exec::{run_atom, run_atom_cached, CacheOutcome, ExecError}; use crate::report::{DagReport, StepReport}; use crate::resolve::{resolve_input, ResolveError}; @@ -40,6 +42,8 @@ pub enum PipeError { Resolve(#[from] ResolveError), #[error(transparent)] Exec(#[from] ExecError), + #[error("open cache db {0}: {1}")] + CacheOpen(String, String), } /// Parse + topo-sort a DAG file without running any atoms. Returns Ok @@ -60,13 +64,47 @@ pub fn run_dag(path: &Path) -> Result { .map_err(|e| PipeError::Read(path.display().to_string(), e))?; let spec = parse_dag(&text)?; let ordered = topo_sort(&spec)?; - Ok(execute_sorted(&ordered)) + let conn = open_cache_if_needed(&spec)?; + Ok(execute_sorted(&spec, &ordered, conn.as_ref())) } -fn execute_sorted(steps: &[&dag::Step]) -> DagReport { +/// Open the SQLite cache Connection only if the DAG declares a path AND +/// at least one step would actually use the cache. Otherwise returns None +/// so the runtime skips the cache layer entirely. +fn open_cache_if_needed(spec: &DagSpec) -> Result, PipeError> { + let Some(db_path) = spec.cache_db.as_ref() else { return Ok(None); }; + let any_cacheable = spec.steps.iter().any(|s| effective_cache(spec, s).is_some()); + if !any_cacheable { + return Ok(None); + } + let conn = kei_cache::store::open(&PathBuf::from(db_path)) + .map_err(|e| PipeError::CacheOpen(db_path.clone(), e.to_string()))?; + Ok(Some(conn)) +} + +/// Resolve the effective cache config for a step: per-step wins over +/// DAG-level. Returns None when caching is disabled or the step's kind +/// is not cacheable (only `query` / `transform` are). +fn effective_cache(spec: &DagSpec, step: &Step) -> Option { + let cfg = step.cache.or(spec.cache)?; + if !cfg.enabled || cfg.ttl_sec <= 0 { + return None; + } + let kind = step.kind?; + if !kind.is_cacheable() { + return None; + } + Some(cfg) +} + +fn execute_sorted( + spec: &DagSpec, + steps: &[&Step], + conn: Option<&rusqlite::Connection>, +) -> DagReport { let mut report = DagReport::new(); for step in steps { - match run_one_step(step, &report) { + match run_one_step(spec, step, &report, conn) { Ok(sr) => { report.push(sr); } @@ -79,13 +117,46 @@ fn execute_sorted(steps: &[&dag::Step]) -> DagReport { report } -fn run_one_step(step: &dag::Step, report: &DagReport) -> Result { +fn run_one_step( + spec: &DagSpec, + step: &Step, + report: &DagReport, + conn: Option<&rusqlite::Connection>, +) -> Result { let input_value = match resolve_input(&step.input, report.results()) { Ok(v) => v, Err(e) => return Err(StepReport::fail(&step.id, &step.atom, format!("resolve: {e}"))), }; - match run_atom(&step.atom, &input_value) { + let cache_cfg = conn.and_then(|_| effective_cache(spec, step)); + match (conn, cache_cfg) { + (Some(c), Some(cfg)) => invoke_with_cache(step, &input_value, c, cfg), + _ => invoke_direct(step, &input_value), + } +} + +fn invoke_direct(step: &Step, input: &serde_json::Value) -> Result { + match run_atom(&step.atom, input) { Ok(result) => Ok(StepReport::ok(&step.id, &step.atom, result)), Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))), } } + +fn invoke_with_cache( + step: &Step, + input: &serde_json::Value, + conn: &rusqlite::Connection, + cfg: CacheConfig, +) -> Result { + match run_atom_cached(conn, &step.atom, input, cfg.ttl_sec) { + Ok((result, outcome)) => Ok(StepReport::ok(&step.id, &step.atom, result) + .with_source(label(outcome))), + Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))), + } +} + +fn label(o: CacheOutcome) -> &'static str { + match o { + CacheOutcome::Hit => "cache", + CacheOutcome::Fresh => "fresh", + } +} diff --git a/_primitives/_rust/kei-pipe/src/report.rs b/_primitives/_rust/kei-pipe/src/report.rs index 894867a..e030cbc 100644 --- a/_primitives/_rust/kei-pipe/src/report.rs +++ b/_primitives/_rust/kei-pipe/src/report.rs @@ -13,6 +13,11 @@ use serde_json::{json, Value}; use std::collections::HashMap; /// One step's outcome. +/// +/// `source` is set only when caching was active for the step: +/// `Some("cache")` on a cache hit, `Some("fresh")` on a cache miss (atom +/// was invoked and its result stored), `None` when caching was disabled +/// or the atom kind gated it out. #[derive(Debug, Clone, Serialize)] pub struct StepReport { pub id: String, @@ -20,6 +25,8 @@ pub struct StepReport { pub ok: bool, pub result: Option, pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, } impl StepReport { @@ -30,6 +37,7 @@ impl StepReport { ok: true, result: Some(result), error: None, + source: None, } } pub fn fail(id: &str, atom: &str, error: String) -> Self { @@ -39,8 +47,14 @@ impl StepReport { ok: false, result: None, error: Some(error), + source: None, } } + /// Builder-style: attach a cache source label (`"cache"` or `"fresh"`). + pub fn with_source(mut self, source: &str) -> Self { + self.source = Some(source.into()); + self + } } /// Full-DAG outcome. `final_result` is the `result` of the last diff --git a/_primitives/_rust/kei-pipe/src/topo.rs b/_primitives/_rust/kei-pipe/src/topo.rs new file mode 100644 index 0000000..f74f467 --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/topo.rs @@ -0,0 +1,98 @@ +//! Kahn-style topological sort for the parsed DAG. +//! +//! Split out from `dag.rs` to stay under the Constructor Pattern 200-LOC +//! limit. Stable — ties are broken by declaration order so reports are +//! deterministic across runs. + +use std::collections::{BTreeMap, HashMap}; + +use crate::dag::{DagError, DagSpec, Step}; + +/// Topologically sort the DAG. Returns `&Step` references in execution +/// order. +pub fn topo_sort(spec: &DagSpec) -> Result, DagError> { + let idx = index_by_id(spec); + validate_edges(spec, &idx)?; + let (in_deg, adj) = build_graph(spec, &idx); + let ordered = kahn_sort(spec, in_deg, adj)?; + Ok(ordered.iter().map(|i| &spec.steps[*i]).collect()) +} + +fn index_by_id(spec: &DagSpec) -> HashMap<&str, usize> { + let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len()); + for (i, s) in spec.steps.iter().enumerate() { + m.insert(s.id.as_str(), i); + } + m +} + +fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> { + for s in &spec.steps { + for dep in &s.depends_on { + if !idx.contains_key(dep.as_str()) { + return Err(DagError::UnknownDep(s.id.clone(), dep.clone())); + } + } + } + Ok(()) +} + +fn build_graph( + spec: &DagSpec, + idx: &HashMap<&str, usize>, +) -> (Vec, Vec>) { + let n = spec.steps.len(); + let mut in_deg = vec![0usize; n]; + let mut adj: Vec> = vec![Vec::new(); n]; + for (i, s) in spec.steps.iter().enumerate() { + for dep in &s.depends_on { + let src = idx[dep.as_str()]; + adj[src].push(i); + in_deg[i] += 1; + } + } + (in_deg, adj) +} + +fn kahn_sort( + spec: &DagSpec, + mut in_deg: Vec, + adj: Vec>, +) -> Result, DagError> { + let n = spec.steps.len(); + let mut ready: BTreeMap = BTreeMap::new(); + seed_ready(&in_deg, &mut ready); + let mut out: Vec = Vec::with_capacity(n); + while let Some((&i, _)) = ready.iter().next() { + ready.remove(&i); + out.push(i); + for &j in &adj[i] { + in_deg[j] -= 1; + if in_deg[j] == 0 { + ready.insert(j, ()); + } + } + } + if out.len() != n { + return Err(DagError::Cycle(unresolved_ids(spec, &out))); + } + Ok(out) +} + +fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap) { + for (i, deg) in in_deg.iter().enumerate() { + if *deg == 0 { + ready.insert(i, ()); + } + } +} + +fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String { + spec.steps + .iter() + .enumerate() + .filter(|(i, _)| !resolved.contains(i)) + .map(|(_, s)| s.id.as_str()) + .collect::>() + .join(", ") +} diff --git a/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs b/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs index c1d6345..4a32c3e 100644 --- a/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs +++ b/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs @@ -17,6 +17,14 @@ use std::collections::HashMap; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; +use std::sync::{Mutex, OnceLock}; + +/// Serialize every test that reads/writes `KEI_RUNTIME_BIN_DIR`. Without +/// this the cache/non-cache tests race and pick up each other's mock dir. +fn env_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) +} /// Create a temp dir holding a POSIX shell script `mock-atom` that, for /// any `run-atom ` invocation, echoes back `{"input": }`. @@ -57,6 +65,7 @@ fn happy_path_runs_two_steps_with_substitution() { eprintln!("skipping: python3 not available"); return; } + let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner()); let bin = mock_bin_dir(); let work = tempfile::tempdir().unwrap(); let dag = r#" @@ -172,3 +181,135 @@ fn run_dag_rejects_unreadable_file() { .expect_err("io error"); assert!(matches!(err, PipeError::Read(_, _)), "got {err:?}"); } + +/// Counting mock: the mock script increments a counter file every run so +/// the test can prove the cache actually bypassed the subprocess. +fn counting_mock_bin_dir(counter_path: &std::path::Path) -> tempfile::TempDir { + let dir = tempfile::tempdir().expect("tempdir"); + let bin = dir.path().join("mockcrate"); + let script = format!( + r#"#!/bin/sh +COUNTER='{}' +N=$(cat "$COUNTER" 2>/dev/null || echo 0) +echo $((N + 1)) > "$COUNTER" +exec python3 -c 'import sys, json; d = sys.stdin.read(); print(json.dumps({{"input": json.loads(d)}}))' +"#, + counter_path.display() + ); + fs::write(&bin, script).unwrap(); + let mut perms = fs::metadata(&bin).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&bin, perms).unwrap(); + dir +} + +fn read_counter(p: &std::path::Path) -> u32 { + fs::read_to_string(p) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0) +} + +#[test] +fn cache_enabled_query_step_reuses_result_on_second_run() { + if !python3_available() { + eprintln!("skipping: python3 not available"); + return; + } + let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner()); + let work = tempfile::tempdir().unwrap(); + let counter = work.path().join("calls.txt"); + let bin = counting_mock_bin_dir(&counter); + let db = work.path().join("cache.sqlite"); + let dag_text = format!( + r#" +[pipe] +cache = {{ enabled = true, ttl_sec = 3600, db = "{}" }} + +[[steps]] +id = "only" +atom = "mockcrate::echo" +kind = "query" +input = {{ q = "same" }} +"#, + db.display() + ); + let path = write_toml(work.path(), "dag.toml", &dag_text); + std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path()); + let r1 = run_dag(&path).expect("run1"); + let r2 = run_dag(&path).expect("run2"); + std::env::remove_var("KEI_RUNTIME_BIN_DIR"); + assert!(r1.final_ok() && r2.final_ok()); + // First run is a miss (source="fresh"), second a hit (source="cache"). + assert_eq!(r1.steps[0].source.as_deref(), Some("fresh")); + assert_eq!(r2.steps[0].source.as_deref(), Some("cache")); + // Atom was invoked exactly once across both runs. + assert_eq!(read_counter(&counter), 1, "atom should have been called once"); +} + +#[test] +fn cache_disabled_always_invokes_atom() { + if !python3_available() { + eprintln!("skipping: python3 not available"); + return; + } + let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner()); + let work = tempfile::tempdir().unwrap(); + let counter = work.path().join("calls.txt"); + let bin = counting_mock_bin_dir(&counter); + let dag_text = r#" +[[steps]] +id = "only" +atom = "mockcrate::echo" +kind = "query" +input = { q = "same" } +"#; + let path = write_toml(work.path(), "dag.toml", dag_text); + std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path()); + let r1 = run_dag(&path).expect("r1"); + let r2 = run_dag(&path).expect("r2"); + std::env::remove_var("KEI_RUNTIME_BIN_DIR"); + assert!(r1.final_ok() && r2.final_ok()); + // No cache → source is None on both runs. + assert!(r1.steps[0].source.is_none()); + assert!(r2.steps[0].source.is_none()); + // Atom was invoked on every run. + assert_eq!(read_counter(&counter), 2); +} + +#[test] +fn cache_command_kind_is_not_cached_even_when_enabled() { + if !python3_available() { + eprintln!("skipping: python3 not available"); + return; + } + let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner()); + let work = tempfile::tempdir().unwrap(); + let counter = work.path().join("calls.txt"); + let bin = counting_mock_bin_dir(&counter); + let db = work.path().join("cache.sqlite"); + let dag_text = format!( + r#" +[pipe] +cache = {{ enabled = true, ttl_sec = 3600, db = "{}" }} + +[[steps]] +id = "only" +atom = "mockcrate::echo" +kind = "command" +input = {{ q = "same" }} +"#, + db.display() + ); + let path = write_toml(work.path(), "dag.toml", &dag_text); + std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path()); + let r1 = run_dag(&path).expect("r1"); + let r2 = run_dag(&path).expect("r2"); + std::env::remove_var("KEI_RUNTIME_BIN_DIR"); + assert!(r1.final_ok() && r2.final_ok()); + // Cache gate: command kind → no source label on either run. + assert!(r1.steps[0].source.is_none(), "r1 source: {:?}", r1.steps[0].source); + assert!(r2.steps[0].source.is_none(), "r2 source: {:?}", r2.steps[0].source); + // Atom invoked on every run because cache gate refused it. + assert_eq!(read_counter(&counter), 2); +}