Merge P-pipe-cache — kei-cache wired into kei-pipe DAG executor

This commit is contained in:
Parfii-bot 2026-04-23 14:27:15 +08:00
commit 7e2e5c642c
9 changed files with 518 additions and 103 deletions

View file

@ -2098,6 +2098,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"kei-cache",
"rusqlite",
"serde",
"serde_json",
"tempfile",

View file

@ -20,6 +20,8 @@ serde_json = "1"
toml = "0.8"
anyhow = "1"
thiserror = "1"
kei-cache = { path = "../kei-cache" }
rusqlite = { version = "0.31", features = ["bundled"] }
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,82 @@
//! Per-step and DAG-level cache configuration types + TOML parsers.
//!
//! Kept separate from `dag.rs` so the core DAG cube stays under the 200-LOC
//! Constructor Pattern limit. Everything here is a pure value type or a
//! small string-validation helper — no I/O, no side effects.
use serde::Deserialize;
use crate::dag::DagError;
/// Per-step or DAG-level cache opt-in. Both fields required when present.
#[derive(Debug, Clone, Copy)]
pub struct CacheConfig {
pub enabled: bool,
pub ttl_sec: i64,
}
/// Atom kind as declared in the DAG. Only `Query` and `Transform` are
/// cacheable (pure); `Command` and `Stream` bypass the cache gate.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StepKind {
Query,
Transform,
Command,
Stream,
}
impl StepKind {
pub fn is_cacheable(self) -> bool {
matches!(self, StepKind::Query | StepKind::Transform)
}
}
/// Internal TOML surface for the `[pipe]` block.
#[derive(Debug, Deserialize, Default)]
pub(crate) struct RawPipe {
#[serde(default)]
pub cache: Option<RawCache>,
}
/// Internal TOML surface for per-step or DAG-level `cache = { ... }`.
#[derive(Debug, Deserialize, Default)]
pub(crate) struct RawCache {
#[serde(default)]
pub enabled: Option<bool>,
#[serde(default, rename = "ttl_sec")]
pub ttl_sec: Option<i64>,
#[serde(default)]
pub db: Option<String>,
}
impl RawCache {
/// Flatten the TOML view into the public [`CacheConfig`] shape. `db`
/// is dropped — the caller reads it separately for DAG-level routing.
pub(crate) fn into_config(self) -> CacheConfig {
CacheConfig {
enabled: self.enabled.unwrap_or(false),
ttl_sec: self.ttl_sec.unwrap_or(0),
}
}
}
/// Split the optional `[pipe]` block into `(cache_config, cache_db_path)`.
pub(crate) fn split_pipe_cache(
raw: Option<RawPipe>,
) -> (Option<CacheConfig>, Option<String>) {
let Some(p) = raw else { return (None, None); };
let Some(c) = p.cache else { return (None, None); };
let db = c.db.clone();
(Some(c.into_config()), db)
}
/// Parse a `kind = "..."` string into a typed [`StepKind`].
pub(crate) fn parse_kind(step_id: &str, s: &str) -> Result<StepKind, DagError> {
match s {
"query" => Ok(StepKind::Query),
"transform" => Ok(StepKind::Transform),
"command" => Ok(StepKind::Command),
"stream" => Ok(StepKind::Stream),
other => Err(DagError::BadKind(step_id.into(), other.into())),
}
}

View file

@ -1,7 +1,9 @@
//! DAG spec parsing + topological sort.
//!
//! TOML shape — `[[steps]]` array with fields `id`, `atom`, optional
//! `depends-on = [ids...]`, optional `input = { ... }`.
//! `depends-on = [ids...]`, optional `input = { ... }`. Optional per-step
//! `kind = "query|transform|command|stream"` and `cache = { enabled, ttl_sec }`.
//! Optional DAG-level `[pipe] cache = { enabled, ttl_sec, db = "..." }`.
//!
//! Invariants:
//! - `id` and `atom` must be non-empty strings
@ -11,7 +13,9 @@
use serde::Deserialize;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::HashSet;
pub use crate::topo::topo_sort;
/// Error cases raised while parsing or sorting a DAG.
#[derive(Debug, thiserror::Error)]
@ -28,8 +32,13 @@ pub enum DagError {
Cycle(String),
#[error("input for step `{0}` must be a TOML table")]
BadInput(String),
#[error("step `{0}` has invalid kind `{1}` (expected query|transform|command|stream)")]
BadKind(String, String),
}
pub use crate::config::{CacheConfig, StepKind};
use crate::config::{parse_kind, split_pipe_cache, RawCache, RawPipe};
/// One atom invocation in a DAG. `input` is retained as `serde_json::Value`
/// so the resolver can walk it uniformly (strings, objects, arrays).
#[derive(Debug, Clone)]
@ -38,13 +47,18 @@ pub struct Step {
pub atom: String,
pub depends_on: Vec<String>,
pub input: Value,
pub kind: Option<StepKind>,
pub cache: Option<CacheConfig>,
}
/// Parsed DAG. `steps` preserves declaration order so error messages line
/// up with the TOML source.
/// up with the TOML source. `cache` is the DAG-level default applied to
/// any cacheable step that lacks its own `cache` override.
#[derive(Debug, Clone, Default)]
pub struct DagSpec {
pub steps: Vec<Step>,
pub cache: Option<CacheConfig>,
pub cache_db: Option<String>,
}
/// Internal TOML surface — kept private so callers only see the cleaned
@ -53,6 +67,8 @@ pub struct DagSpec {
struct RawDag {
#[serde(default)]
steps: Vec<RawStep>,
#[serde(default)]
pipe: Option<RawPipe>,
}
#[derive(Debug, Deserialize)]
@ -63,6 +79,10 @@ struct RawStep {
depends_on: Vec<String>,
#[serde(default)]
input: Option<toml::Value>,
#[serde(default)]
kind: Option<String>,
#[serde(default)]
cache: Option<RawCache>,
}
/// Parse TOML text into a cleaned `DagSpec` with per-step validation.
@ -74,7 +94,8 @@ pub fn parse_dag(text: &str) -> Result<DagSpec, DagError> {
let step = build_step(idx, rs, &mut seen)?;
steps.push(step);
}
Ok(DagSpec { steps })
let (cache, cache_db) = split_pipe_cache(raw.pipe);
Ok(DagSpec { steps, cache, cache_db })
}
fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet<String>) -> Result<Step, DagError> {
@ -90,7 +111,12 @@ fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet<String>) -> Result<Ste
.filter(|s| !s.is_empty())
.ok_or_else(|| DagError::MissingField(id.clone(), "atom"))?;
let input = normalize_input(&id, rs.input)?;
Ok(Step { id, atom, depends_on: rs.depends_on, input })
let kind = match rs.kind {
None => None,
Some(s) => Some(parse_kind(&id, &s)?),
};
let cache = rs.cache.map(|c| c.into_config());
Ok(Step { id, atom, depends_on: rs.depends_on, input, kind, cache })
}
fn normalize_input(id: &str, raw: Option<toml::Value>) -> Result<Value, DagError> {
@ -102,93 +128,3 @@ fn normalize_input(id: &str, raw: Option<toml::Value>) -> Result<Value, DagError
Ok(s)
}
/// Topologically sort the DAG. Returns `&Step` references in execution
/// order. Stable — ties are broken by declaration order so reports are
/// deterministic.
pub fn topo_sort(spec: &DagSpec) -> Result<Vec<&Step>, DagError> {
let index_by_id = index_by_id(spec)?;
validate_edges(spec, &index_by_id)?;
let (in_deg, adj) = build_graph(spec, &index_by_id);
let ordered_indices = kahn_sort(spec, in_deg, adj)?;
Ok(ordered_indices.iter().map(|i| &spec.steps[*i]).collect())
}
fn index_by_id(spec: &DagSpec) -> Result<HashMap<&str, usize>, DagError> {
let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len());
for (i, s) in spec.steps.iter().enumerate() {
m.insert(s.id.as_str(), i);
}
Ok(m)
}
fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> {
for s in &spec.steps {
for dep in &s.depends_on {
if !idx.contains_key(dep.as_str()) {
return Err(DagError::UnknownDep(s.id.clone(), dep.clone()));
}
}
}
Ok(())
}
fn build_graph(
spec: &DagSpec,
idx: &HashMap<&str, usize>,
) -> (Vec<usize>, Vec<Vec<usize>>) {
let n = spec.steps.len();
let mut in_deg = vec![0usize; n];
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, s) in spec.steps.iter().enumerate() {
for dep in &s.depends_on {
let src = idx[dep.as_str()];
adj[src].push(i);
in_deg[i] += 1;
}
}
(in_deg, adj)
}
fn kahn_sort(
spec: &DagSpec,
mut in_deg: Vec<usize>,
adj: Vec<Vec<usize>>,
) -> Result<Vec<usize>, DagError> {
let n = spec.steps.len();
let mut ready: BTreeMap<usize, ()> = BTreeMap::new();
seed_ready(&in_deg, &mut ready);
let mut out: Vec<usize> = Vec::with_capacity(n);
while let Some((&i, _)) = ready.iter().next() {
ready.remove(&i);
out.push(i);
for &j in &adj[i] {
in_deg[j] -= 1;
if in_deg[j] == 0 {
ready.insert(j, ());
}
}
}
if out.len() != n {
return Err(DagError::Cycle(unresolved_ids(spec, &out)));
}
Ok(out)
}
/// Seed the `ready` set with every node whose in-degree is zero.
fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap<usize, ()>) {
for (i, deg) in in_deg.iter().enumerate() {
if *deg == 0 {
ready.insert(i, ());
}
}
}
fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String {
spec.steps
.iter()
.enumerate()
.filter(|(i, _)| !resolved.contains(i))
.map(|(_, s)| s.id.as_str())
.collect::<Vec<_>>()
.join(", ")
}

View file

@ -29,6 +29,8 @@ pub enum ExecError {
OutputParse { atom: String, err: String, stdout: String },
#[error("serialize input: {0}")]
InputSerialize(String),
#[error("cache error: {0}")]
Cache(String),
}
/// Parse an atom id into `(crate, verb)`. Rejects empty halves.
@ -51,6 +53,73 @@ pub fn run_atom(atom: &str, input: &Value) -> Result<Value, ExecError> {
parse_output(atom, output)
}
/// Outcome label accompanying a cache-aware invocation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheOutcome {
/// Returned from the cache; atom was NOT invoked.
Hit,
/// Cache miss; atom was invoked and the result stored.
Fresh,
}
impl CacheOutcome {
pub fn as_str(self) -> &'static str {
match self {
CacheOutcome::Hit => "cache",
CacheOutcome::Fresh => "fresh",
}
}
}
/// Cache-aware atom invocation. On hit returns cached JSON; on miss calls
/// [`run_atom`], stores the serialised result under the computed key with
/// `ttl_sec`, and returns `Fresh`. Cache I/O errors are surfaced via
/// [`ExecError::Cache`] so the caller can distinguish from atom failures.
pub fn run_atom_cached(
conn: &rusqlite::Connection,
atom: &str,
input: &Value,
ttl_sec: i64,
) -> Result<(Value, CacheOutcome), ExecError> {
let key = kei_cache::key::cache_key(atom, input);
let hit = kei_cache::store::get(conn, &key).map_err(|e| ExecError::Cache(e.to_string()))?;
match hit {
Some(payload) => load_hit(conn, atom, payload),
None => load_miss(conn, atom, input, &key, ttl_sec),
}
}
fn load_hit(
conn: &rusqlite::Connection,
atom: &str,
payload: String,
) -> Result<(Value, CacheOutcome), ExecError> {
let _ = kei_cache::store::bump(conn, "hits");
let value: Value =
serde_json::from_str(&payload).map_err(|e| ExecError::OutputParse {
atom: atom.into(),
err: e.to_string(),
stdout: payload,
})?;
Ok((value, CacheOutcome::Hit))
}
fn load_miss(
conn: &rusqlite::Connection,
atom: &str,
input: &Value,
key: &str,
ttl_sec: i64,
) -> Result<(Value, CacheOutcome), ExecError> {
let result = run_atom(atom, input)?;
let payload =
serde_json::to_string(&result).map_err(|e| ExecError::InputSerialize(e.to_string()))?;
kei_cache::store::put(conn, key, atom, &payload, ttl_sec)
.map_err(|e| ExecError::Cache(e.to_string()))?;
let _ = kei_cache::store::bump(conn, "misses");
Ok((result, CacheOutcome::Fresh))
}
fn spawn_and_wait(
bin: &PathBuf,
verb: &str,

View file

@ -17,15 +17,17 @@
//! - [`report::DagReport`] / [`report::StepReport`] — run outcome
//! - [`run_dag`] / [`validate_dag`] — top-level entry points
pub mod config;
pub mod dag;
pub mod exec;
pub mod report;
pub mod resolve;
pub mod topo;
use std::path::Path;
use std::path::{Path, PathBuf};
use crate::dag::{parse_dag, topo_sort, DagError};
use crate::exec::{run_atom, ExecError};
use crate::dag::{parse_dag, topo_sort, CacheConfig, DagError, DagSpec, Step};
use crate::exec::{run_atom, run_atom_cached, CacheOutcome, ExecError};
use crate::report::{DagReport, StepReport};
use crate::resolve::{resolve_input, ResolveError};
@ -40,6 +42,8 @@ pub enum PipeError {
Resolve(#[from] ResolveError),
#[error(transparent)]
Exec(#[from] ExecError),
#[error("open cache db {0}: {1}")]
CacheOpen(String, String),
}
/// Parse + topo-sort a DAG file without running any atoms. Returns Ok
@ -60,13 +64,47 @@ pub fn run_dag(path: &Path) -> Result<DagReport, PipeError> {
.map_err(|e| PipeError::Read(path.display().to_string(), e))?;
let spec = parse_dag(&text)?;
let ordered = topo_sort(&spec)?;
Ok(execute_sorted(&ordered))
let conn = open_cache_if_needed(&spec)?;
Ok(execute_sorted(&spec, &ordered, conn.as_ref()))
}
fn execute_sorted(steps: &[&dag::Step]) -> DagReport {
/// Open the SQLite cache Connection only if the DAG declares a path AND
/// at least one step would actually use the cache. Otherwise returns None
/// so the runtime skips the cache layer entirely.
fn open_cache_if_needed(spec: &DagSpec) -> Result<Option<rusqlite::Connection>, PipeError> {
let Some(db_path) = spec.cache_db.as_ref() else { return Ok(None); };
let any_cacheable = spec.steps.iter().any(|s| effective_cache(spec, s).is_some());
if !any_cacheable {
return Ok(None);
}
let conn = kei_cache::store::open(&PathBuf::from(db_path))
.map_err(|e| PipeError::CacheOpen(db_path.clone(), e.to_string()))?;
Ok(Some(conn))
}
/// Resolve the effective cache config for a step: per-step wins over
/// DAG-level. Returns None when caching is disabled or the step's kind
/// is not cacheable (only `query` / `transform` are).
fn effective_cache(spec: &DagSpec, step: &Step) -> Option<CacheConfig> {
let cfg = step.cache.or(spec.cache)?;
if !cfg.enabled || cfg.ttl_sec <= 0 {
return None;
}
let kind = step.kind?;
if !kind.is_cacheable() {
return None;
}
Some(cfg)
}
fn execute_sorted(
spec: &DagSpec,
steps: &[&Step],
conn: Option<&rusqlite::Connection>,
) -> DagReport {
let mut report = DagReport::new();
for step in steps {
match run_one_step(step, &report) {
match run_one_step(spec, step, &report, conn) {
Ok(sr) => {
report.push(sr);
}
@ -79,13 +117,46 @@ fn execute_sorted(steps: &[&dag::Step]) -> DagReport {
report
}
fn run_one_step(step: &dag::Step, report: &DagReport) -> Result<StepReport, StepReport> {
fn run_one_step(
spec: &DagSpec,
step: &Step,
report: &DagReport,
conn: Option<&rusqlite::Connection>,
) -> Result<StepReport, StepReport> {
let input_value = match resolve_input(&step.input, report.results()) {
Ok(v) => v,
Err(e) => return Err(StepReport::fail(&step.id, &step.atom, format!("resolve: {e}"))),
};
match run_atom(&step.atom, &input_value) {
let cache_cfg = conn.and_then(|_| effective_cache(spec, step));
match (conn, cache_cfg) {
(Some(c), Some(cfg)) => invoke_with_cache(step, &input_value, c, cfg),
_ => invoke_direct(step, &input_value),
}
}
fn invoke_direct(step: &Step, input: &serde_json::Value) -> Result<StepReport, StepReport> {
match run_atom(&step.atom, input) {
Ok(result) => Ok(StepReport::ok(&step.id, &step.atom, result)),
Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))),
}
}
fn invoke_with_cache(
step: &Step,
input: &serde_json::Value,
conn: &rusqlite::Connection,
cfg: CacheConfig,
) -> Result<StepReport, StepReport> {
match run_atom_cached(conn, &step.atom, input, cfg.ttl_sec) {
Ok((result, outcome)) => Ok(StepReport::ok(&step.id, &step.atom, result)
.with_source(label(outcome))),
Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))),
}
}
fn label(o: CacheOutcome) -> &'static str {
match o {
CacheOutcome::Hit => "cache",
CacheOutcome::Fresh => "fresh",
}
}

View file

@ -13,6 +13,11 @@ use serde_json::{json, Value};
use std::collections::HashMap;
/// One step's outcome.
///
/// `source` is set only when caching was active for the step:
/// `Some("cache")` on a cache hit, `Some("fresh")` on a cache miss (atom
/// was invoked and its result stored), `None` when caching was disabled
/// or the atom kind gated it out.
#[derive(Debug, Clone, Serialize)]
pub struct StepReport {
pub id: String,
@ -20,6 +25,8 @@ pub struct StepReport {
pub ok: bool,
pub result: Option<Value>,
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source: Option<String>,
}
impl StepReport {
@ -30,6 +37,7 @@ impl StepReport {
ok: true,
result: Some(result),
error: None,
source: None,
}
}
pub fn fail(id: &str, atom: &str, error: String) -> Self {
@ -39,8 +47,14 @@ impl StepReport {
ok: false,
result: None,
error: Some(error),
source: None,
}
}
/// Builder-style: attach a cache source label (`"cache"` or `"fresh"`).
pub fn with_source(mut self, source: &str) -> Self {
self.source = Some(source.into());
self
}
}
/// Full-DAG outcome. `final_result` is the `result` of the last

View file

@ -0,0 +1,98 @@
//! Kahn-style topological sort for the parsed DAG.
//!
//! Split out from `dag.rs` to stay under the Constructor Pattern 200-LOC
//! limit. Stable — ties are broken by declaration order so reports are
//! deterministic across runs.
use std::collections::{BTreeMap, HashMap};
use crate::dag::{DagError, DagSpec, Step};
/// Topologically sort the DAG. Returns `&Step` references in execution
/// order.
pub fn topo_sort(spec: &DagSpec) -> Result<Vec<&Step>, DagError> {
let idx = index_by_id(spec);
validate_edges(spec, &idx)?;
let (in_deg, adj) = build_graph(spec, &idx);
let ordered = kahn_sort(spec, in_deg, adj)?;
Ok(ordered.iter().map(|i| &spec.steps[*i]).collect())
}
fn index_by_id(spec: &DagSpec) -> HashMap<&str, usize> {
let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len());
for (i, s) in spec.steps.iter().enumerate() {
m.insert(s.id.as_str(), i);
}
m
}
fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> {
for s in &spec.steps {
for dep in &s.depends_on {
if !idx.contains_key(dep.as_str()) {
return Err(DagError::UnknownDep(s.id.clone(), dep.clone()));
}
}
}
Ok(())
}
fn build_graph(
spec: &DagSpec,
idx: &HashMap<&str, usize>,
) -> (Vec<usize>, Vec<Vec<usize>>) {
let n = spec.steps.len();
let mut in_deg = vec![0usize; n];
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, s) in spec.steps.iter().enumerate() {
for dep in &s.depends_on {
let src = idx[dep.as_str()];
adj[src].push(i);
in_deg[i] += 1;
}
}
(in_deg, adj)
}
fn kahn_sort(
spec: &DagSpec,
mut in_deg: Vec<usize>,
adj: Vec<Vec<usize>>,
) -> Result<Vec<usize>, DagError> {
let n = spec.steps.len();
let mut ready: BTreeMap<usize, ()> = BTreeMap::new();
seed_ready(&in_deg, &mut ready);
let mut out: Vec<usize> = Vec::with_capacity(n);
while let Some((&i, _)) = ready.iter().next() {
ready.remove(&i);
out.push(i);
for &j in &adj[i] {
in_deg[j] -= 1;
if in_deg[j] == 0 {
ready.insert(j, ());
}
}
}
if out.len() != n {
return Err(DagError::Cycle(unresolved_ids(spec, &out)));
}
Ok(out)
}
fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap<usize, ()>) {
for (i, deg) in in_deg.iter().enumerate() {
if *deg == 0 {
ready.insert(i, ());
}
}
}
fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String {
spec.steps
.iter()
.enumerate()
.filter(|(i, _)| !resolved.contains(i))
.map(|(_, s)| s.id.as_str())
.collect::<Vec<_>>()
.join(", ")
}

View file

@ -17,6 +17,14 @@ use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::{Mutex, OnceLock};
/// Serialize every test that reads/writes `KEI_RUNTIME_BIN_DIR`. Without
/// this the cache/non-cache tests race and pick up each other's mock dir.
fn env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
/// Create a temp dir holding a POSIX shell script `mock-atom` that, for
/// any `run-atom <verb>` invocation, echoes back `{"input": <stdin>}`.
@ -57,6 +65,7 @@ fn happy_path_runs_two_steps_with_substitution() {
eprintln!("skipping: python3 not available");
return;
}
let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner());
let bin = mock_bin_dir();
let work = tempfile::tempdir().unwrap();
let dag = r#"
@ -172,3 +181,135 @@ fn run_dag_rejects_unreadable_file() {
.expect_err("io error");
assert!(matches!(err, PipeError::Read(_, _)), "got {err:?}");
}
/// Counting mock: the mock script increments a counter file every run so
/// the test can prove the cache actually bypassed the subprocess.
fn counting_mock_bin_dir(counter_path: &std::path::Path) -> tempfile::TempDir {
let dir = tempfile::tempdir().expect("tempdir");
let bin = dir.path().join("mockcrate");
let script = format!(
r#"#!/bin/sh
COUNTER='{}'
N=$(cat "$COUNTER" 2>/dev/null || echo 0)
echo $((N + 1)) > "$COUNTER"
exec python3 -c 'import sys, json; d = sys.stdin.read(); print(json.dumps({{"input": json.loads(d)}}))'
"#,
counter_path.display()
);
fs::write(&bin, script).unwrap();
let mut perms = fs::metadata(&bin).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&bin, perms).unwrap();
dir
}
fn read_counter(p: &std::path::Path) -> u32 {
fs::read_to_string(p)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0)
}
#[test]
fn cache_enabled_query_step_reuses_result_on_second_run() {
if !python3_available() {
eprintln!("skipping: python3 not available");
return;
}
let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner());
let work = tempfile::tempdir().unwrap();
let counter = work.path().join("calls.txt");
let bin = counting_mock_bin_dir(&counter);
let db = work.path().join("cache.sqlite");
let dag_text = format!(
r#"
[pipe]
cache = {{ enabled = true, ttl_sec = 3600, db = "{}" }}
[[steps]]
id = "only"
atom = "mockcrate::echo"
kind = "query"
input = {{ q = "same" }}
"#,
db.display()
);
let path = write_toml(work.path(), "dag.toml", &dag_text);
std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path());
let r1 = run_dag(&path).expect("run1");
let r2 = run_dag(&path).expect("run2");
std::env::remove_var("KEI_RUNTIME_BIN_DIR");
assert!(r1.final_ok() && r2.final_ok());
// First run is a miss (source="fresh"), second a hit (source="cache").
assert_eq!(r1.steps[0].source.as_deref(), Some("fresh"));
assert_eq!(r2.steps[0].source.as_deref(), Some("cache"));
// Atom was invoked exactly once across both runs.
assert_eq!(read_counter(&counter), 1, "atom should have been called once");
}
#[test]
fn cache_disabled_always_invokes_atom() {
if !python3_available() {
eprintln!("skipping: python3 not available");
return;
}
let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner());
let work = tempfile::tempdir().unwrap();
let counter = work.path().join("calls.txt");
let bin = counting_mock_bin_dir(&counter);
let dag_text = r#"
[[steps]]
id = "only"
atom = "mockcrate::echo"
kind = "query"
input = { q = "same" }
"#;
let path = write_toml(work.path(), "dag.toml", dag_text);
std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path());
let r1 = run_dag(&path).expect("r1");
let r2 = run_dag(&path).expect("r2");
std::env::remove_var("KEI_RUNTIME_BIN_DIR");
assert!(r1.final_ok() && r2.final_ok());
// No cache → source is None on both runs.
assert!(r1.steps[0].source.is_none());
assert!(r2.steps[0].source.is_none());
// Atom was invoked on every run.
assert_eq!(read_counter(&counter), 2);
}
#[test]
fn cache_command_kind_is_not_cached_even_when_enabled() {
if !python3_available() {
eprintln!("skipping: python3 not available");
return;
}
let _guard = env_lock().lock().unwrap_or_else(|p| p.into_inner());
let work = tempfile::tempdir().unwrap();
let counter = work.path().join("calls.txt");
let bin = counting_mock_bin_dir(&counter);
let db = work.path().join("cache.sqlite");
let dag_text = format!(
r#"
[pipe]
cache = {{ enabled = true, ttl_sec = 3600, db = "{}" }}
[[steps]]
id = "only"
atom = "mockcrate::echo"
kind = "command"
input = {{ q = "same" }}
"#,
db.display()
);
let path = write_toml(work.path(), "dag.toml", &dag_text);
std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path());
let r1 = run_dag(&path).expect("r1");
let r2 = run_dag(&path).expect("r2");
std::env::remove_var("KEI_RUNTIME_BIN_DIR");
assert!(r1.final_ok() && r2.final_ok());
// Cache gate: command kind → no source label on either run.
assert!(r1.steps[0].source.is_none(), "r1 source: {:?}", r1.steps[0].source);
assert!(r2.steps[0].source.is_none(), "r2 source: {:?}", r2.steps[0].source);
// Atom invoked on every run because cache gate refused it.
assert_eq!(read_counter(&counter), 2);
}