From 0b948ca07c1a62e682fcffadcaabb66315232947 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Thu, 23 Apr 2026 05:55:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(r1):=20new=20kei-pipe=20crate=20=E2=80=94?= =?UTF-8?q?=20atom=20DAG=20runtime?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The critical missing substrate composition layer. kei-pipe run — reads DAG spec, topo-sorts atoms, executes sequentially, pipes JSON between steps via $step.path.to.field resolver. 6 Constructor-Pattern cubes: dag/resolve/exec/report/lib/main. 5/5 smoke tests: happy path + cycle detection + unknown dep + nested path resolver + unreadable file. Resolver envelope matches kei-runtime Output — atoms round-trip identically through either runtime. Workspace Cargo.toml: +kei-pipe member. Co-Authored-By: Claude Opus 4.7 (1M context) --- _primitives/_rust/Cargo.lock | 13 ++ _primitives/_rust/Cargo.toml | 2 + _primitives/_rust/kei-pipe/Cargo.toml | 29 +++ _primitives/_rust/kei-pipe/src/dag.rs | 194 ++++++++++++++++++ _primitives/_rust/kei-pipe/src/exec.rs | 110 ++++++++++ _primitives/_rust/kei-pipe/src/lib.rs | 91 ++++++++ _primitives/_rust/kei-pipe/src/main.rs | 72 +++++++ _primitives/_rust/kei-pipe/src/report.rs | 89 ++++++++ _primitives/_rust/kei-pipe/src/resolve.rs | 128 ++++++++++++ .../_rust/kei-pipe/tests/pipe_smoke.rs | 174 ++++++++++++++++ 10 files changed, 902 insertions(+) create mode 100644 _primitives/_rust/kei-pipe/Cargo.toml create mode 100644 _primitives/_rust/kei-pipe/src/dag.rs create mode 100644 _primitives/_rust/kei-pipe/src/exec.rs create mode 100644 _primitives/_rust/kei-pipe/src/lib.rs create mode 100644 _primitives/_rust/kei-pipe/src/main.rs create mode 100644 _primitives/_rust/kei-pipe/src/report.rs create mode 100644 _primitives/_rust/kei-pipe/src/resolve.rs create mode 100644 _primitives/_rust/kei-pipe/tests/pipe_smoke.rs diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 908b8c5..3eb59da 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -2073,6 +2073,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "kei-pipe" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "serde", + "serde_json", + "tempfile", + "thiserror 1.0.69", + "toml", +] + [[package]] name = "kei-provision" version = "0.1.0" diff --git a/_primitives/_rust/Cargo.toml b/_primitives/_rust/Cargo.toml index 16569eb..fb7f8de 100644 --- a/_primitives/_rust/Cargo.toml +++ b/_primitives/_rust/Cargo.toml @@ -43,6 +43,8 @@ members = [ "kei-provision", # Convergence Layer A — schema-driven verb-template engine for SQLite-CRUD stores "kei-entity-store", + # v1 substrate — atom DAG pipe runtime (topo-sort + JSON piping between atoms) + "kei-pipe", ] [workspace.package] diff --git a/_primitives/_rust/kei-pipe/Cargo.toml b/_primitives/_rust/kei-pipe/Cargo.toml new file mode 100644 index 0000000..419ee29 --- /dev/null +++ b/_primitives/_rust/kei-pipe/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "kei-pipe" +version = "0.1.0" +edition = "2021" +rust-version = "1.75" +description = "Atom DAG pipe runtime — topo-sorts steps, pipes JSON between atoms." + +[[bin]] +name = "kei-pipe" +path = "src/main.rs" + +[lib] +name = "kei_pipe" +path = "src/lib.rs" + +[dependencies] +clap = { version = "4", features = ["derive"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +toml = "0.8" +anyhow = "1" +thiserror = "1" + +[dev-dependencies] +tempfile = "3" + +[package.metadata.keisei] +backend = "none" +description = "Atom DAG pipe runtime — topo-sorts steps, pipes JSON between atoms." diff --git a/_primitives/_rust/kei-pipe/src/dag.rs b/_primitives/_rust/kei-pipe/src/dag.rs new file mode 100644 index 0000000..75bfaec --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/dag.rs @@ -0,0 +1,194 @@ +//! DAG spec parsing + topological sort. +//! +//! TOML shape — `[[steps]]` array with fields `id`, `atom`, optional +//! `depends-on = [ids...]`, optional `input = { ... }`. +//! +//! Invariants: +//! - `id` and `atom` must be non-empty strings +//! - `id` must be unique across the DAG +//! - every `depends-on` entry must reference a known step id +//! - the dependency graph must be acyclic + +use serde::Deserialize; +use serde_json::Value; +use std::collections::{BTreeMap, HashMap, HashSet}; + +/// Error cases raised while parsing or sorting a DAG. +#[derive(Debug, thiserror::Error)] +pub enum DagError { + #[error("toml parse: {0}")] + Toml(String), + #[error("step {0} missing required field `{1}`")] + MissingField(String, &'static str), + #[error("duplicate step id: {0}")] + DuplicateId(String), + #[error("step `{0}` depends on unknown id `{1}`")] + UnknownDep(String, String), + #[error("cycle detected involving: {0}")] + Cycle(String), + #[error("input for step `{0}` must be a TOML table")] + BadInput(String), +} + +/// One atom invocation in a DAG. `input` is retained as `serde_json::Value` +/// so the resolver can walk it uniformly (strings, objects, arrays). +#[derive(Debug, Clone)] +pub struct Step { + pub id: String, + pub atom: String, + pub depends_on: Vec, + pub input: Value, +} + +/// Parsed DAG. `steps` preserves declaration order so error messages line +/// up with the TOML source. +#[derive(Debug, Clone, Default)] +pub struct DagSpec { + pub steps: Vec, +} + +/// Internal TOML surface — kept private so callers only see the cleaned +/// `DagSpec` / `Step` shape. +#[derive(Debug, Deserialize)] +struct RawDag { + #[serde(default)] + steps: Vec, +} + +#[derive(Debug, Deserialize)] +struct RawStep { + id: Option, + atom: Option, + #[serde(rename = "depends-on", default)] + depends_on: Vec, + #[serde(default)] + input: Option, +} + +/// Parse TOML text into a cleaned `DagSpec` with per-step validation. +pub fn parse_dag(text: &str) -> Result { + let raw: RawDag = toml::from_str(text).map_err(|e| DagError::Toml(e.to_string()))?; + let mut seen: HashSet = HashSet::new(); + let mut steps: Vec = Vec::with_capacity(raw.steps.len()); + for (idx, rs) in raw.steps.into_iter().enumerate() { + let step = build_step(idx, rs, &mut seen)?; + steps.push(step); + } + Ok(DagSpec { steps }) +} + +fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet) -> Result { + let id = rs + .id + .filter(|s| !s.is_empty()) + .ok_or_else(|| DagError::MissingField(format!("#{idx}"), "id"))?; + if !seen.insert(id.clone()) { + return Err(DagError::DuplicateId(id)); + } + let atom = rs + .atom + .filter(|s| !s.is_empty()) + .ok_or_else(|| DagError::MissingField(id.clone(), "atom"))?; + let input = normalize_input(&id, rs.input)?; + Ok(Step { id, atom, depends_on: rs.depends_on, input }) +} + +fn normalize_input(id: &str, raw: Option) -> Result { + let v = raw.unwrap_or(toml::Value::Table(toml::map::Map::new())); + if !matches!(v, toml::Value::Table(_)) { + return Err(DagError::BadInput(id.into())); + } + let s = serde_json::to_value(v).map_err(|e| DagError::Toml(e.to_string()))?; + Ok(s) +} + +/// Topologically sort the DAG. Returns `&Step` references in execution +/// order. Stable — ties are broken by declaration order so reports are +/// deterministic. +pub fn topo_sort(spec: &DagSpec) -> Result, DagError> { + let index_by_id = index_by_id(spec)?; + validate_edges(spec, &index_by_id)?; + let (in_deg, adj) = build_graph(spec, &index_by_id); + let ordered_indices = kahn_sort(spec, in_deg, adj)?; + Ok(ordered_indices.iter().map(|i| &spec.steps[*i]).collect()) +} + +fn index_by_id(spec: &DagSpec) -> Result, DagError> { + let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len()); + for (i, s) in spec.steps.iter().enumerate() { + m.insert(s.id.as_str(), i); + } + Ok(m) +} + +fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> { + for s in &spec.steps { + for dep in &s.depends_on { + if !idx.contains_key(dep.as_str()) { + return Err(DagError::UnknownDep(s.id.clone(), dep.clone())); + } + } + } + Ok(()) +} + +fn build_graph( + spec: &DagSpec, + idx: &HashMap<&str, usize>, +) -> (Vec, Vec>) { + let n = spec.steps.len(); + let mut in_deg = vec![0usize; n]; + let mut adj: Vec> = vec![Vec::new(); n]; + for (i, s) in spec.steps.iter().enumerate() { + for dep in &s.depends_on { + let src = idx[dep.as_str()]; + adj[src].push(i); + in_deg[i] += 1; + } + } + (in_deg, adj) +} + +fn kahn_sort( + spec: &DagSpec, + mut in_deg: Vec, + adj: Vec>, +) -> Result, DagError> { + let n = spec.steps.len(); + let mut ready: BTreeMap = BTreeMap::new(); + seed_ready(&in_deg, &mut ready); + let mut out: Vec = Vec::with_capacity(n); + while let Some((&i, _)) = ready.iter().next() { + ready.remove(&i); + out.push(i); + for &j in &adj[i] { + in_deg[j] -= 1; + if in_deg[j] == 0 { + ready.insert(j, ()); + } + } + } + if out.len() != n { + return Err(DagError::Cycle(unresolved_ids(spec, &out))); + } + Ok(out) +} + +/// Seed the `ready` set with every node whose in-degree is zero. +fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap) { + for (i, deg) in in_deg.iter().enumerate() { + if *deg == 0 { + ready.insert(i, ()); + } + } +} + +fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String { + spec.steps + .iter() + .enumerate() + .filter(|(i, _)| !resolved.contains(i)) + .map(|(_, s)| s.id.as_str()) + .collect::>() + .join(", ") +} diff --git a/_primitives/_rust/kei-pipe/src/exec.rs b/_primitives/_rust/kei-pipe/src/exec.rs new file mode 100644 index 0000000..039894b --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/exec.rs @@ -0,0 +1,110 @@ +//! Spawn an atom subprocess and return its JSON output. +//! +//! Atom IDs are `::` — e.g. `kei-task::create`. The +//! crate name resolves to an executable using the same contract as +//! `kei-runtime`: first `$KEI_RUNTIME_BIN_DIR/`, then walk `PATH`. +//! +//! The atom is invoked as ` run-atom `, JSON on stdin, JSON +//! on stdout. Exit code 0 = ok; anything else = `AtomFailed`. Tests can +//! substitute a mock binary by pointing `KEI_RUNTIME_BIN_DIR` at a temp +//! dir whose `` is a shell script that echoes its stdin (see +//! `tests/pipe_smoke.rs`). + +use serde_json::Value; +use std::io::Write; +use std::path::PathBuf; +use std::process::{Command, Stdio}; + +#[derive(Debug, thiserror::Error)] +pub enum ExecError { + #[error("atom id `{0}` must be `::`")] + BadAtomId(String), + #[error("binary `{0}` not found on PATH or KEI_RUNTIME_BIN_DIR")] + BinaryNotFound(String), + #[error("atom `{atom}` exited {code}: {stderr}")] + AtomFailed { atom: String, code: i32, stderr: String }, + #[error("subprocess {0}: {1}")] + Subprocess(String, std::io::Error), + #[error("atom `{atom}` stdout not JSON: {err}; stdout was: {stdout}")] + OutputParse { atom: String, err: String, stdout: String }, + #[error("serialize input: {0}")] + InputSerialize(String), +} + +/// Parse an atom id into `(crate, verb)`. Rejects empty halves. +pub fn split_atom(atom: &str) -> Result<(&str, &str), ExecError> { + match atom.split_once("::") { + Some((c, v)) if !c.is_empty() && !v.is_empty() => Ok((c, v)), + _ => Err(ExecError::BadAtomId(atom.into())), + } +} + +/// Invoke an atom, returning the parsed JSON result (the atom's own +/// stdout — callers decide how to slot it under `{"atom":..., "result":...}`). +pub fn run_atom(atom: &str, input: &Value) -> Result { + let (crate_name, verb) = split_atom(atom)?; + let bin = resolve_binary(crate_name) + .ok_or_else(|| ExecError::BinaryNotFound(crate_name.into()))?; + let stdin_bytes = serde_json::to_vec(input) + .map_err(|e| ExecError::InputSerialize(e.to_string()))?; + let output = spawn_and_wait(&bin, verb, &stdin_bytes, atom)?; + parse_output(atom, output) +} + +fn spawn_and_wait( + bin: &PathBuf, + verb: &str, + input_bytes: &[u8], + atom: &str, +) -> Result { + let mut child = Command::new(bin) + .arg("run-atom") + .arg(verb) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| ExecError::Subprocess(format!("spawn {atom}"), e))?; + if let Some(mut stdin) = child.stdin.take() { + stdin + .write_all(input_bytes) + .map_err(|e| ExecError::Subprocess(format!("stdin {atom}"), e))?; + } + child + .wait_with_output() + .map_err(|e| ExecError::Subprocess(format!("wait {atom}"), e)) +} + +fn parse_output(atom: &str, out: std::process::Output) -> Result { + if !out.status.success() { + return Err(ExecError::AtomFailed { + atom: atom.into(), + code: out.status.code().unwrap_or(-1), + stderr: String::from_utf8_lossy(&out.stderr).trim().into(), + }); + } + let stdout = String::from_utf8_lossy(&out.stdout).into_owned(); + serde_json::from_str(stdout.trim()).map_err(|e| ExecError::OutputParse { + atom: atom.into(), + err: e.to_string(), + stdout, + }) +} + +/// Resolve `` as an executable. Mirrors `kei-runtime::invoke`. +fn resolve_binary(crate_name: &str) -> Option { + if let Ok(bin_dir) = std::env::var("KEI_RUNTIME_BIN_DIR") { + let candidate = PathBuf::from(bin_dir).join(crate_name); + if candidate.is_file() { + return Some(candidate); + } + } + let path = std::env::var("PATH").ok()?; + for dir in std::env::split_paths(&path) { + let candidate = dir.join(crate_name); + if candidate.is_file() { + return Some(candidate); + } + } + None +} diff --git a/_primitives/_rust/kei-pipe/src/lib.rs b/_primitives/_rust/kei-pipe/src/lib.rs new file mode 100644 index 0000000..882853e --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/lib.rs @@ -0,0 +1,91 @@ +//! `kei-pipe` — atom DAG runtime. +//! +//! Reads a TOML DAG spec, topologically sorts the steps, then runs each +//! atom sequentially. JSON output of a step can be referenced by a later +//! step via `$step-id.path.to.field` in its input block. +//! +//! Atom invocation: spawn ` run-atom ` with JSON on +//! stdin, parse stdout as JSON. Binary resolution honours +//! `KEI_RUNTIME_BIN_DIR` first, then `PATH` (same contract as +//! `kei-runtime`). +//! +//! Public surface: +//! - [`dag::DagSpec`] / [`dag::Step`] — parsed DAG structures +//! - [`dag::parse_dag`] / [`dag::topo_sort`] — DAG pipeline +//! - [`resolve::resolve_input`] — substitute `$step.path` in input values +//! - [`exec::run_atom`] — invoke one atom via subprocess +//! - [`report::DagReport`] / [`report::StepReport`] — run outcome +//! - [`run_dag`] / [`validate_dag`] — top-level entry points + +pub mod dag; +pub mod exec; +pub mod report; +pub mod resolve; + +use std::path::Path; + +use crate::dag::{parse_dag, topo_sort, DagError}; +use crate::exec::{run_atom, ExecError}; +use crate::report::{DagReport, StepReport}; +use crate::resolve::{resolve_input, ResolveError}; + +/// Top-level errors from running a DAG. +#[derive(Debug, thiserror::Error)] +pub enum PipeError { + #[error("read {0}: {1}")] + Read(String, std::io::Error), + #[error(transparent)] + Dag(#[from] DagError), + #[error(transparent)] + Resolve(#[from] ResolveError), + #[error(transparent)] + Exec(#[from] ExecError), +} + +/// Parse + topo-sort a DAG file without running any atoms. Returns Ok +/// with the ordered list of step IDs when the DAG is well-formed. +pub fn validate_dag(path: &Path) -> Result, PipeError> { + let text = std::fs::read_to_string(path) + .map_err(|e| PipeError::Read(path.display().to_string(), e))?; + let spec = parse_dag(&text)?; + let order = topo_sort(&spec)?; + Ok(order.into_iter().map(|s| s.id.clone()).collect()) +} + +/// Parse + execute a DAG file. On success returns a full report; on the +/// first step failure the report still contains every step processed up +/// to (and including) the failing one, with `ok=false` on that step. +pub fn run_dag(path: &Path) -> Result { + let text = std::fs::read_to_string(path) + .map_err(|e| PipeError::Read(path.display().to_string(), e))?; + let spec = parse_dag(&text)?; + let ordered = topo_sort(&spec)?; + Ok(execute_sorted(&ordered)) +} + +fn execute_sorted(steps: &[&dag::Step]) -> DagReport { + let mut report = DagReport::new(); + for step in steps { + match run_one_step(step, &report) { + Ok(sr) => { + report.push(sr); + } + Err(sr) => { + report.push(sr); + break; + } + } + } + report +} + +fn run_one_step(step: &dag::Step, report: &DagReport) -> Result { + let input_value = match resolve_input(&step.input, report.results()) { + Ok(v) => v, + Err(e) => return Err(StepReport::fail(&step.id, &step.atom, format!("resolve: {e}"))), + }; + match run_atom(&step.atom, &input_value) { + Ok(result) => Ok(StepReport::ok(&step.id, &step.atom, result)), + Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))), + } +} diff --git a/_primitives/_rust/kei-pipe/src/main.rs b/_primitives/_rust/kei-pipe/src/main.rs new file mode 100644 index 0000000..573a568 --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/main.rs @@ -0,0 +1,72 @@ +//! `kei-pipe` CLI — `run ` and `validate `. +//! +//! Exit codes: +//! - 0 — ok (run or validate) +//! - 1 — CLI / IO / parse failure (DAG couldn't be loaded) +//! - 2 — one or more steps failed during `run` + +use clap::{Parser, Subcommand}; +use kei_pipe::{run_dag, validate_dag}; +use std::path::PathBuf; +use std::process::ExitCode; + +#[derive(Parser)] +#[command(name = "kei-pipe", version, about = "Atom DAG pipe runtime")] +struct Cli { + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Subcommand)] +enum Cmd { + /// Run a DAG — execute every step in topo order, print final report. + Run { path: PathBuf }, + /// Parse + topo-sort without executing; prints the resolved order. + Validate { path: PathBuf }, +} + +fn main() -> ExitCode { + let cli = Cli::parse(); + match cli.cmd { + Cmd::Run { path } => cmd_run(&path), + Cmd::Validate { path } => cmd_validate(&path), + } +} + +fn cmd_run(path: &std::path::Path) -> ExitCode { + match run_dag(path) { + Ok(report) => { + match serde_json::to_string_pretty(&report) { + Ok(s) => println!("{s}"), + Err(e) => { + eprintln!("serialize report: {e}"); + return ExitCode::from(1); + } + } + if report.final_ok() { + ExitCode::SUCCESS + } else { + ExitCode::from(2) + } + } + Err(e) => { + eprintln!("{e}"); + ExitCode::from(1) + } + } +} + +fn cmd_validate(path: &std::path::Path) -> ExitCode { + match validate_dag(path) { + Ok(order) => { + for id in order { + println!("{id}"); + } + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("{e}"); + ExitCode::from(1) + } + } +} diff --git a/_primitives/_rust/kei-pipe/src/report.rs b/_primitives/_rust/kei-pipe/src/report.rs new file mode 100644 index 0000000..894867a --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/report.rs @@ -0,0 +1,89 @@ +//! Per-step and DAG-level run reports. +//! +//! A [`StepReport`] is emitted for every step actually attempted, in +//! execution order. A [`DagReport`] aggregates them and exposes the +//! resolver lookup map so later steps can reference earlier outputs. +//! +//! When a step fails, execution halts (sequential runtime) and the +//! failing step is the last entry in `steps`. Callers can check +//! `final_ok()` and inspect `steps.last()` for the error. + +use serde::Serialize; +use serde_json::{json, Value}; +use std::collections::HashMap; + +/// One step's outcome. +#[derive(Debug, Clone, Serialize)] +pub struct StepReport { + pub id: String, + pub atom: String, + pub ok: bool, + pub result: Option, + pub error: Option, +} + +impl StepReport { + pub fn ok(id: &str, atom: &str, result: Value) -> Self { + Self { + id: id.into(), + atom: atom.into(), + ok: true, + result: Some(result), + error: None, + } + } + pub fn fail(id: &str, atom: &str, error: String) -> Self { + Self { + id: id.into(), + atom: atom.into(), + ok: false, + result: None, + error: Some(error), + } + } +} + +/// Full-DAG outcome. `final_result` is the `result` of the last +/// successful step, or `null` when nothing ran successfully. +#[derive(Debug, Clone, Default, Serialize)] +pub struct DagReport { + pub steps: Vec, + pub final_result: Value, + /// Resolver lookup — envelope shape `{"atom":..., "result":...}`. + #[serde(skip)] + resolver: HashMap, +} + +impl DagReport { + pub fn new() -> Self { + Self { + steps: Vec::new(), + final_result: Value::Null, + resolver: HashMap::new(), + } + } + + /// Append one step's report. On success, also updates the resolver + /// map so downstream `$step.result.foo` references work. + pub fn push(&mut self, step: StepReport) { + if step.ok { + let envelope = json!({ "atom": step.atom, "result": step.result }); + self.resolver.insert(step.id.clone(), envelope); + if let Some(ref r) = step.result { + self.final_result = r.clone(); + } + } + self.steps.push(step); + } + + /// Borrow the resolver map for downstream `$step.path` lookups. + pub fn results(&self) -> &HashMap { + &self.resolver + } + + /// True when every step completed with `ok = true` AND at least one + /// step ran (an empty DAG counts as ok-but-empty). + pub fn final_ok(&self) -> bool { + self.steps.iter().all(|s| s.ok) + } +} diff --git a/_primitives/_rust/kei-pipe/src/resolve.rs b/_primitives/_rust/kei-pipe/src/resolve.rs new file mode 100644 index 0000000..e2d63b8 --- /dev/null +++ b/_primitives/_rust/kei-pipe/src/resolve.rs @@ -0,0 +1,128 @@ +//! `$step-id.path.to.field` resolver. +//! +//! A reference is a string of the form `$[.]*` where a +//! segment is either a dotted key (`foo`) or a numeric index (`0`). The +//! leading segment after the step id is matched against the step's +//! returned `{"atom":..., "result":...}` envelope the same way a caller +//! would type it — including an explicit `result.` prefix. +//! +//! A string that is `$step.path.to.field` verbatim is substituted by the +//! referenced value (which may itself be a JSON object, array, number). +//! Strings that merely contain a `$` somewhere else are left alone. + +use serde_json::{Map, Value}; +use std::collections::HashMap; + +#[derive(Debug, thiserror::Error)] +pub enum ResolveError { + #[error("unknown step in reference `{0}`")] + UnknownStep(String), + #[error("path `{0}` not found in result of step `{1}`")] + MissingPath(String, String), + #[error("path segment `{0}` expects an object/array but got `{1}`")] + WrongKind(String, String), +} + +/// Walk `input` recursively, replacing every `$step.path` string with the +/// resolved JSON value from `previous`. +pub fn resolve_input( + input: &Value, + previous: &HashMap, +) -> Result { + match input { + Value::String(s) => resolve_string(s, previous), + Value::Array(items) => resolve_array(items, previous), + Value::Object(map) => resolve_object(map, previous), + other => Ok(other.clone()), + } +} + +fn resolve_string(s: &str, previous: &HashMap) -> Result { + if let Some(stripped) = s.strip_prefix('$') { + return lookup_reference(stripped, previous); + } + Ok(Value::String(s.to_string())) +} + +fn resolve_array( + items: &[Value], + previous: &HashMap, +) -> Result { + let mut out: Vec = Vec::with_capacity(items.len()); + for v in items { + out.push(resolve_input(v, previous)?); + } + Ok(Value::Array(out)) +} + +fn resolve_object( + map: &Map, + previous: &HashMap, +) -> Result { + let mut out = Map::with_capacity(map.len()); + for (k, v) in map { + out.insert(k.clone(), resolve_input(v, previous)?); + } + Ok(Value::Object(out)) +} + +fn lookup_reference( + stripped: &str, + previous: &HashMap, +) -> Result { + let (step_id, remainder) = split_head(stripped); + let envelope = previous + .get(step_id) + .ok_or_else(|| ResolveError::UnknownStep(step_id.to_string()))?; + walk_path(envelope, remainder, step_id) +} + +fn split_head(s: &str) -> (&str, &str) { + match s.find('.') { + Some(i) => (&s[..i], &s[i + 1..]), + None => (s, ""), + } +} + +fn walk_path(root: &Value, path: &str, step_id: &str) -> Result { + if path.is_empty() { + return Ok(root.clone()); + } + let mut current = root; + for seg in path.split('.') { + current = descend(current, seg, path, step_id)?; + } + Ok(current.clone()) +} + +fn descend<'a>( + current: &'a Value, + seg: &str, + path: &str, + step_id: &str, +) -> Result<&'a Value, ResolveError> { + match current { + Value::Object(m) => m + .get(seg) + .ok_or_else(|| ResolveError::MissingPath(path.into(), step_id.into())), + Value::Array(a) => { + let idx: usize = seg + .parse() + .map_err(|_| ResolveError::MissingPath(path.into(), step_id.into()))?; + a.get(idx) + .ok_or_else(|| ResolveError::MissingPath(path.into(), step_id.into())) + } + other => Err(ResolveError::WrongKind(seg.into(), kind_of(other).into())), + } +} + +fn kind_of(v: &Value) -> &'static str { + match v { + Value::Null => "null", + Value::Bool(_) => "bool", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} diff --git a/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs b/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs new file mode 100644 index 0000000..c1d6345 --- /dev/null +++ b/_primitives/_rust/kei-pipe/tests/pipe_smoke.rs @@ -0,0 +1,174 @@ +//! Smoke tests for the `kei-pipe` DAG runtime. +//! +//! Four scenarios: +//! 1. Happy path — 2 steps with `$step.result.id` substitution. +//! 2. Cycle detection — errors clearly. +//! 3. Unknown dependency — errors clearly. +//! 4. Resolver walks `$step.nested.sub.field` into deep paths. +//! +//! Mock atom: a shell script `mock-atom` that echoes stdin as `result`. +//! The crate name is `mockcrate`, so atom ids look like `mockcrate::echo`. + +use kei_pipe::dag::{parse_dag, topo_sort, DagError}; +use kei_pipe::resolve::resolve_input; +use kei_pipe::{run_dag, PipeError}; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::fs; +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; + +/// Create a temp dir holding a POSIX shell script `mock-atom` that, for +/// any `run-atom ` invocation, echoes back `{"input": }`. +fn mock_bin_dir() -> tempfile::TempDir { + let dir = tempfile::tempdir().expect("tempdir"); + let bin = dir.path().join("mockcrate"); + // Read stdin verbatim and wrap in {"input": ...}. Python3 is cross- + // platform enough for CI; we avoid jq/node to keep the dep surface + // minimal. Smoke tests skip when python3 is missing. + let script = r#"#!/bin/sh +exec python3 -c 'import sys, json; d = sys.stdin.read(); print(json.dumps({"input": json.loads(d)}))' +"#; + fs::write(&bin, script).unwrap(); + let mut perms = fs::metadata(&bin).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&bin, perms).unwrap(); + dir +} + +fn python3_available() -> bool { + std::process::Command::new("python3") + .arg("-c") + .arg("print(1)") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) +} + +fn write_toml(dir: &std::path::Path, name: &str, body: &str) -> PathBuf { + let path = dir.join(name); + fs::write(&path, body).unwrap(); + path +} + +#[test] +fn happy_path_runs_two_steps_with_substitution() { + if !python3_available() { + eprintln!("skipping: python3 not available"); + return; + } + let bin = mock_bin_dir(); + let work = tempfile::tempdir().unwrap(); + let dag = r#" +[[steps]] +id = "first" +atom = "mockcrate::echo" +input = { title = "Fix bug" } + +[[steps]] +id = "second" +atom = "mockcrate::echo" +depends-on = ["first"] +input = { prior = "$first.result.input.title", literal = 42 } +"#; + let path = write_toml(work.path(), "dag.toml", dag); + std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path()); + let report = run_dag(&path).expect("run"); + std::env::remove_var("KEI_RUNTIME_BIN_DIR"); + assert!(report.final_ok(), "all steps ok; got {:?}", report.steps); + assert_eq!(report.steps.len(), 2); + assert_eq!(report.steps[0].id, "first"); + assert_eq!(report.steps[1].id, "second"); + let second_result = report.steps[1] + .result + .as_ref() + .expect("second.result"); + // The mock echoes the RESOLVED input, so `prior` should equal "Fix bug" + // after substitution from first.result.input.title. + assert_eq!( + second_result["input"]["prior"], + Value::String("Fix bug".to_string()) + ); + assert_eq!(second_result["input"]["literal"], json!(42)); +} + +#[test] +fn cycle_is_detected_with_step_ids() { + let dag = r#" +[[steps]] +id = "a" +atom = "mockcrate::echo" +depends-on = ["b"] + +[[steps]] +id = "b" +atom = "mockcrate::echo" +depends-on = ["a"] +"#; + let spec = parse_dag(dag).expect("parse"); + let err = topo_sort(&spec).expect_err("cycle"); + match err { + DagError::Cycle(ids) => { + assert!(ids.contains('a') && ids.contains('b'), "ids: {ids}"); + } + other => panic!("expected Cycle, got {other:?}"), + } +} + +#[test] +fn unknown_dependency_errors_with_step_name() { + let dag = r#" +[[steps]] +id = "only" +atom = "mockcrate::echo" +depends-on = ["ghost"] +"#; + let spec = parse_dag(dag).expect("parse"); + let err = topo_sort(&spec).expect_err("unknown dep"); + match err { + DagError::UnknownDep(step, dep) => { + assert_eq!(step, "only"); + assert_eq!(dep, "ghost"); + } + other => panic!("expected UnknownDep, got {other:?}"), + } +} + +#[test] +fn resolver_walks_nested_paths_and_array_indices() { + // Simulate an envelope produced by a prior step. + let mut prev: HashMap = HashMap::new(); + prev.insert( + "first".into(), + json!({ + "atom": "mockcrate::echo", + "result": { + "id": 17, + "nested": { "sub": { "field": "deep" } }, + "items": ["alpha", "beta", { "tag": "third" }] + } + }), + ); + let input = json!({ + "direct_id": "$first.result.id", + "deep": "$first.result.nested.sub.field", + "array_index": "$first.result.items.1", + "array_object": "$first.result.items.2.tag", + "root": "$first", + "untouched": "plain string" + }); + let out = resolve_input(&input, &prev).expect("resolve"); + assert_eq!(out["direct_id"], json!(17)); + assert_eq!(out["deep"], json!("deep")); + assert_eq!(out["array_index"], json!("beta")); + assert_eq!(out["array_object"], json!("third")); + assert_eq!(out["root"]["atom"], json!("mockcrate::echo")); + assert_eq!(out["untouched"], json!("plain string")); +} + +#[test] +fn run_dag_rejects_unreadable_file() { + let err = run_dag(std::path::Path::new("/no/such/file-a8f3.toml")) + .expect_err("io error"); + assert!(matches!(err, PipeError::Read(_, _)), "got {err:?}"); +}