feat(r1): new kei-pipe crate — atom DAG runtime

The critical missing substrate composition layer.

kei-pipe run <dag.toml> — reads DAG spec, topo-sorts atoms, executes
sequentially, pipes JSON between steps via $step.path.to.field
resolver. 6 Constructor-Pattern cubes: dag/resolve/exec/report/lib/main.

5/5 smoke tests: happy path + cycle detection + unknown dep +
nested path resolver + unreadable file.

Resolver envelope matches kei-runtime Output — atoms round-trip
identically through either runtime.

Workspace Cargo.toml: +kei-pipe member.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 05:55:13 +08:00
parent e075ae8df1
commit 0b948ca07c
10 changed files with 902 additions and 0 deletions

View file

@ -2073,6 +2073,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "kei-pipe"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"serde",
"serde_json",
"tempfile",
"thiserror 1.0.69",
"toml",
]
[[package]]
name = "kei-provision"
version = "0.1.0"

View file

@ -43,6 +43,8 @@ members = [
"kei-provision",
# Convergence Layer A — schema-driven verb-template engine for SQLite-CRUD stores
"kei-entity-store",
# v1 substrate — atom DAG pipe runtime (topo-sort + JSON piping between atoms)
"kei-pipe",
]
[workspace.package]

View file

@ -0,0 +1,29 @@
[package]
name = "kei-pipe"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Atom DAG pipe runtime — topo-sorts steps, pipes JSON between atoms."
[[bin]]
name = "kei-pipe"
path = "src/main.rs"
[lib]
name = "kei_pipe"
path = "src/lib.rs"
[dependencies]
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.8"
anyhow = "1"
thiserror = "1"
[dev-dependencies]
tempfile = "3"
[package.metadata.keisei]
backend = "none"
description = "Atom DAG pipe runtime — topo-sorts steps, pipes JSON between atoms."

View file

@ -0,0 +1,194 @@
//! DAG spec parsing + topological sort.
//!
//! TOML shape — `[[steps]]` array with fields `id`, `atom`, optional
//! `depends-on = [ids...]`, optional `input = { ... }`.
//!
//! Invariants:
//! - `id` and `atom` must be non-empty strings
//! - `id` must be unique across the DAG
//! - every `depends-on` entry must reference a known step id
//! - the dependency graph must be acyclic
use serde::Deserialize;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
/// Error cases raised while parsing or sorting a DAG.
#[derive(Debug, thiserror::Error)]
pub enum DagError {
#[error("toml parse: {0}")]
Toml(String),
#[error("step {0} missing required field `{1}`")]
MissingField(String, &'static str),
#[error("duplicate step id: {0}")]
DuplicateId(String),
#[error("step `{0}` depends on unknown id `{1}`")]
UnknownDep(String, String),
#[error("cycle detected involving: {0}")]
Cycle(String),
#[error("input for step `{0}` must be a TOML table")]
BadInput(String),
}
/// One atom invocation in a DAG. `input` is retained as `serde_json::Value`
/// so the resolver can walk it uniformly (strings, objects, arrays).
#[derive(Debug, Clone)]
pub struct Step {
pub id: String,
pub atom: String,
pub depends_on: Vec<String>,
pub input: Value,
}
/// Parsed DAG. `steps` preserves declaration order so error messages line
/// up with the TOML source.
#[derive(Debug, Clone, Default)]
pub struct DagSpec {
pub steps: Vec<Step>,
}
/// Internal TOML surface — kept private so callers only see the cleaned
/// `DagSpec` / `Step` shape.
#[derive(Debug, Deserialize)]
struct RawDag {
#[serde(default)]
steps: Vec<RawStep>,
}
#[derive(Debug, Deserialize)]
struct RawStep {
id: Option<String>,
atom: Option<String>,
#[serde(rename = "depends-on", default)]
depends_on: Vec<String>,
#[serde(default)]
input: Option<toml::Value>,
}
/// Parse TOML text into a cleaned `DagSpec` with per-step validation.
pub fn parse_dag(text: &str) -> Result<DagSpec, DagError> {
let raw: RawDag = toml::from_str(text).map_err(|e| DagError::Toml(e.to_string()))?;
let mut seen: HashSet<String> = HashSet::new();
let mut steps: Vec<Step> = Vec::with_capacity(raw.steps.len());
for (idx, rs) in raw.steps.into_iter().enumerate() {
let step = build_step(idx, rs, &mut seen)?;
steps.push(step);
}
Ok(DagSpec { steps })
}
fn build_step(idx: usize, rs: RawStep, seen: &mut HashSet<String>) -> Result<Step, DagError> {
let id = rs
.id
.filter(|s| !s.is_empty())
.ok_or_else(|| DagError::MissingField(format!("#{idx}"), "id"))?;
if !seen.insert(id.clone()) {
return Err(DagError::DuplicateId(id));
}
let atom = rs
.atom
.filter(|s| !s.is_empty())
.ok_or_else(|| DagError::MissingField(id.clone(), "atom"))?;
let input = normalize_input(&id, rs.input)?;
Ok(Step { id, atom, depends_on: rs.depends_on, input })
}
fn normalize_input(id: &str, raw: Option<toml::Value>) -> Result<Value, DagError> {
let v = raw.unwrap_or(toml::Value::Table(toml::map::Map::new()));
if !matches!(v, toml::Value::Table(_)) {
return Err(DagError::BadInput(id.into()));
}
let s = serde_json::to_value(v).map_err(|e| DagError::Toml(e.to_string()))?;
Ok(s)
}
/// Topologically sort the DAG. Returns `&Step` references in execution
/// order. Stable — ties are broken by declaration order so reports are
/// deterministic.
pub fn topo_sort(spec: &DagSpec) -> Result<Vec<&Step>, DagError> {
let index_by_id = index_by_id(spec)?;
validate_edges(spec, &index_by_id)?;
let (in_deg, adj) = build_graph(spec, &index_by_id);
let ordered_indices = kahn_sort(spec, in_deg, adj)?;
Ok(ordered_indices.iter().map(|i| &spec.steps[*i]).collect())
}
fn index_by_id(spec: &DagSpec) -> Result<HashMap<&str, usize>, DagError> {
let mut m: HashMap<&str, usize> = HashMap::with_capacity(spec.steps.len());
for (i, s) in spec.steps.iter().enumerate() {
m.insert(s.id.as_str(), i);
}
Ok(m)
}
fn validate_edges(spec: &DagSpec, idx: &HashMap<&str, usize>) -> Result<(), DagError> {
for s in &spec.steps {
for dep in &s.depends_on {
if !idx.contains_key(dep.as_str()) {
return Err(DagError::UnknownDep(s.id.clone(), dep.clone()));
}
}
}
Ok(())
}
fn build_graph(
spec: &DagSpec,
idx: &HashMap<&str, usize>,
) -> (Vec<usize>, Vec<Vec<usize>>) {
let n = spec.steps.len();
let mut in_deg = vec![0usize; n];
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, s) in spec.steps.iter().enumerate() {
for dep in &s.depends_on {
let src = idx[dep.as_str()];
adj[src].push(i);
in_deg[i] += 1;
}
}
(in_deg, adj)
}
fn kahn_sort(
spec: &DagSpec,
mut in_deg: Vec<usize>,
adj: Vec<Vec<usize>>,
) -> Result<Vec<usize>, DagError> {
let n = spec.steps.len();
let mut ready: BTreeMap<usize, ()> = BTreeMap::new();
seed_ready(&in_deg, &mut ready);
let mut out: Vec<usize> = Vec::with_capacity(n);
while let Some((&i, _)) = ready.iter().next() {
ready.remove(&i);
out.push(i);
for &j in &adj[i] {
in_deg[j] -= 1;
if in_deg[j] == 0 {
ready.insert(j, ());
}
}
}
if out.len() != n {
return Err(DagError::Cycle(unresolved_ids(spec, &out)));
}
Ok(out)
}
/// Seed the `ready` set with every node whose in-degree is zero.
fn seed_ready(in_deg: &[usize], ready: &mut BTreeMap<usize, ()>) {
for (i, deg) in in_deg.iter().enumerate() {
if *deg == 0 {
ready.insert(i, ());
}
}
}
fn unresolved_ids(spec: &DagSpec, resolved: &[usize]) -> String {
spec.steps
.iter()
.enumerate()
.filter(|(i, _)| !resolved.contains(i))
.map(|(_, s)| s.id.as_str())
.collect::<Vec<_>>()
.join(", ")
}

View file

@ -0,0 +1,110 @@
//! Spawn an atom subprocess and return its JSON output.
//!
//! Atom IDs are `<crate-name>::<verb>` — e.g. `kei-task::create`. The
//! crate name resolves to an executable using the same contract as
//! `kei-runtime`: first `$KEI_RUNTIME_BIN_DIR/<crate>`, then walk `PATH`.
//!
//! The atom is invoked as `<crate> run-atom <verb>`, JSON on stdin, JSON
//! on stdout. Exit code 0 = ok; anything else = `AtomFailed`. Tests can
//! substitute a mock binary by pointing `KEI_RUNTIME_BIN_DIR` at a temp
//! dir whose `<crate>` is a shell script that echoes its stdin (see
//! `tests/pipe_smoke.rs`).
use serde_json::Value;
use std::io::Write;
use std::path::PathBuf;
use std::process::{Command, Stdio};
#[derive(Debug, thiserror::Error)]
pub enum ExecError {
#[error("atom id `{0}` must be `<crate>::<verb>`")]
BadAtomId(String),
#[error("binary `{0}` not found on PATH or KEI_RUNTIME_BIN_DIR")]
BinaryNotFound(String),
#[error("atom `{atom}` exited {code}: {stderr}")]
AtomFailed { atom: String, code: i32, stderr: String },
#[error("subprocess {0}: {1}")]
Subprocess(String, std::io::Error),
#[error("atom `{atom}` stdout not JSON: {err}; stdout was: {stdout}")]
OutputParse { atom: String, err: String, stdout: String },
#[error("serialize input: {0}")]
InputSerialize(String),
}
/// Parse an atom id into `(crate, verb)`. Rejects empty halves.
pub fn split_atom(atom: &str) -> Result<(&str, &str), ExecError> {
match atom.split_once("::") {
Some((c, v)) if !c.is_empty() && !v.is_empty() => Ok((c, v)),
_ => Err(ExecError::BadAtomId(atom.into())),
}
}
/// Invoke an atom, returning the parsed JSON result (the atom's own
/// stdout — callers decide how to slot it under `{"atom":..., "result":...}`).
pub fn run_atom(atom: &str, input: &Value) -> Result<Value, ExecError> {
let (crate_name, verb) = split_atom(atom)?;
let bin = resolve_binary(crate_name)
.ok_or_else(|| ExecError::BinaryNotFound(crate_name.into()))?;
let stdin_bytes = serde_json::to_vec(input)
.map_err(|e| ExecError::InputSerialize(e.to_string()))?;
let output = spawn_and_wait(&bin, verb, &stdin_bytes, atom)?;
parse_output(atom, output)
}
fn spawn_and_wait(
bin: &PathBuf,
verb: &str,
input_bytes: &[u8],
atom: &str,
) -> Result<std::process::Output, ExecError> {
let mut child = Command::new(bin)
.arg("run-atom")
.arg(verb)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| ExecError::Subprocess(format!("spawn {atom}"), e))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(input_bytes)
.map_err(|e| ExecError::Subprocess(format!("stdin {atom}"), e))?;
}
child
.wait_with_output()
.map_err(|e| ExecError::Subprocess(format!("wait {atom}"), e))
}
fn parse_output(atom: &str, out: std::process::Output) -> Result<Value, ExecError> {
if !out.status.success() {
return Err(ExecError::AtomFailed {
atom: atom.into(),
code: out.status.code().unwrap_or(-1),
stderr: String::from_utf8_lossy(&out.stderr).trim().into(),
});
}
let stdout = String::from_utf8_lossy(&out.stdout).into_owned();
serde_json::from_str(stdout.trim()).map_err(|e| ExecError::OutputParse {
atom: atom.into(),
err: e.to_string(),
stdout,
})
}
/// Resolve `<crate>` as an executable. Mirrors `kei-runtime::invoke`.
fn resolve_binary(crate_name: &str) -> Option<PathBuf> {
if let Ok(bin_dir) = std::env::var("KEI_RUNTIME_BIN_DIR") {
let candidate = PathBuf::from(bin_dir).join(crate_name);
if candidate.is_file() {
return Some(candidate);
}
}
let path = std::env::var("PATH").ok()?;
for dir in std::env::split_paths(&path) {
let candidate = dir.join(crate_name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}

View file

@ -0,0 +1,91 @@
//! `kei-pipe` — atom DAG runtime.
//!
//! Reads a TOML DAG spec, topologically sorts the steps, then runs each
//! atom sequentially. JSON output of a step can be referenced by a later
//! step via `$step-id.path.to.field` in its input block.
//!
//! Atom invocation: spawn `<crate-name> run-atom <verb>` with JSON on
//! stdin, parse stdout as JSON. Binary resolution honours
//! `KEI_RUNTIME_BIN_DIR` first, then `PATH` (same contract as
//! `kei-runtime`).
//!
//! Public surface:
//! - [`dag::DagSpec`] / [`dag::Step`] — parsed DAG structures
//! - [`dag::parse_dag`] / [`dag::topo_sort`] — DAG pipeline
//! - [`resolve::resolve_input`] — substitute `$step.path` in input values
//! - [`exec::run_atom`] — invoke one atom via subprocess
//! - [`report::DagReport`] / [`report::StepReport`] — run outcome
//! - [`run_dag`] / [`validate_dag`] — top-level entry points
pub mod dag;
pub mod exec;
pub mod report;
pub mod resolve;
use std::path::Path;
use crate::dag::{parse_dag, topo_sort, DagError};
use crate::exec::{run_atom, ExecError};
use crate::report::{DagReport, StepReport};
use crate::resolve::{resolve_input, ResolveError};
/// Top-level errors from running a DAG.
#[derive(Debug, thiserror::Error)]
pub enum PipeError {
#[error("read {0}: {1}")]
Read(String, std::io::Error),
#[error(transparent)]
Dag(#[from] DagError),
#[error(transparent)]
Resolve(#[from] ResolveError),
#[error(transparent)]
Exec(#[from] ExecError),
}
/// Parse + topo-sort a DAG file without running any atoms. Returns Ok
/// with the ordered list of step IDs when the DAG is well-formed.
pub fn validate_dag(path: &Path) -> Result<Vec<String>, PipeError> {
let text = std::fs::read_to_string(path)
.map_err(|e| PipeError::Read(path.display().to_string(), e))?;
let spec = parse_dag(&text)?;
let order = topo_sort(&spec)?;
Ok(order.into_iter().map(|s| s.id.clone()).collect())
}
/// Parse + execute a DAG file. On success returns a full report; on the
/// first step failure the report still contains every step processed up
/// to (and including) the failing one, with `ok=false` on that step.
pub fn run_dag(path: &Path) -> Result<DagReport, PipeError> {
let text = std::fs::read_to_string(path)
.map_err(|e| PipeError::Read(path.display().to_string(), e))?;
let spec = parse_dag(&text)?;
let ordered = topo_sort(&spec)?;
Ok(execute_sorted(&ordered))
}
fn execute_sorted(steps: &[&dag::Step]) -> DagReport {
let mut report = DagReport::new();
for step in steps {
match run_one_step(step, &report) {
Ok(sr) => {
report.push(sr);
}
Err(sr) => {
report.push(sr);
break;
}
}
}
report
}
fn run_one_step(step: &dag::Step, report: &DagReport) -> Result<StepReport, StepReport> {
let input_value = match resolve_input(&step.input, report.results()) {
Ok(v) => v,
Err(e) => return Err(StepReport::fail(&step.id, &step.atom, format!("resolve: {e}"))),
};
match run_atom(&step.atom, &input_value) {
Ok(result) => Ok(StepReport::ok(&step.id, &step.atom, result)),
Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))),
}
}

View file

@ -0,0 +1,72 @@
//! `kei-pipe` CLI — `run <dag.toml>` and `validate <dag.toml>`.
//!
//! Exit codes:
//! - 0 — ok (run or validate)
//! - 1 — CLI / IO / parse failure (DAG couldn't be loaded)
//! - 2 — one or more steps failed during `run`
use clap::{Parser, Subcommand};
use kei_pipe::{run_dag, validate_dag};
use std::path::PathBuf;
use std::process::ExitCode;
#[derive(Parser)]
#[command(name = "kei-pipe", version, about = "Atom DAG pipe runtime")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Run a DAG — execute every step in topo order, print final report.
Run { path: PathBuf },
/// Parse + topo-sort without executing; prints the resolved order.
Validate { path: PathBuf },
}
fn main() -> ExitCode {
let cli = Cli::parse();
match cli.cmd {
Cmd::Run { path } => cmd_run(&path),
Cmd::Validate { path } => cmd_validate(&path),
}
}
fn cmd_run(path: &std::path::Path) -> ExitCode {
match run_dag(path) {
Ok(report) => {
match serde_json::to_string_pretty(&report) {
Ok(s) => println!("{s}"),
Err(e) => {
eprintln!("serialize report: {e}");
return ExitCode::from(1);
}
}
if report.final_ok() {
ExitCode::SUCCESS
} else {
ExitCode::from(2)
}
}
Err(e) => {
eprintln!("{e}");
ExitCode::from(1)
}
}
}
fn cmd_validate(path: &std::path::Path) -> ExitCode {
match validate_dag(path) {
Ok(order) => {
for id in order {
println!("{id}");
}
ExitCode::SUCCESS
}
Err(e) => {
eprintln!("{e}");
ExitCode::from(1)
}
}
}

View file

@ -0,0 +1,89 @@
//! Per-step and DAG-level run reports.
//!
//! A [`StepReport`] is emitted for every step actually attempted, in
//! execution order. A [`DagReport`] aggregates them and exposes the
//! resolver lookup map so later steps can reference earlier outputs.
//!
//! When a step fails, execution halts (sequential runtime) and the
//! failing step is the last entry in `steps`. Callers can check
//! `final_ok()` and inspect `steps.last()` for the error.
use serde::Serialize;
use serde_json::{json, Value};
use std::collections::HashMap;
/// One step's outcome.
#[derive(Debug, Clone, Serialize)]
pub struct StepReport {
pub id: String,
pub atom: String,
pub ok: bool,
pub result: Option<Value>,
pub error: Option<String>,
}
impl StepReport {
pub fn ok(id: &str, atom: &str, result: Value) -> Self {
Self {
id: id.into(),
atom: atom.into(),
ok: true,
result: Some(result),
error: None,
}
}
pub fn fail(id: &str, atom: &str, error: String) -> Self {
Self {
id: id.into(),
atom: atom.into(),
ok: false,
result: None,
error: Some(error),
}
}
}
/// Full-DAG outcome. `final_result` is the `result` of the last
/// successful step, or `null` when nothing ran successfully.
#[derive(Debug, Clone, Default, Serialize)]
pub struct DagReport {
pub steps: Vec<StepReport>,
pub final_result: Value,
/// Resolver lookup — envelope shape `{"atom":..., "result":...}`.
#[serde(skip)]
resolver: HashMap<String, Value>,
}
impl DagReport {
pub fn new() -> Self {
Self {
steps: Vec::new(),
final_result: Value::Null,
resolver: HashMap::new(),
}
}
/// Append one step's report. On success, also updates the resolver
/// map so downstream `$step.result.foo` references work.
pub fn push(&mut self, step: StepReport) {
if step.ok {
let envelope = json!({ "atom": step.atom, "result": step.result });
self.resolver.insert(step.id.clone(), envelope);
if let Some(ref r) = step.result {
self.final_result = r.clone();
}
}
self.steps.push(step);
}
/// Borrow the resolver map for downstream `$step.path` lookups.
pub fn results(&self) -> &HashMap<String, Value> {
&self.resolver
}
/// True when every step completed with `ok = true` AND at least one
/// step ran (an empty DAG counts as ok-but-empty).
pub fn final_ok(&self) -> bool {
self.steps.iter().all(|s| s.ok)
}
}

View file

@ -0,0 +1,128 @@
//! `$step-id.path.to.field` resolver.
//!
//! A reference is a string of the form `$<step-id>[.<segment>]*` where a
//! segment is either a dotted key (`foo`) or a numeric index (`0`). The
//! leading segment after the step id is matched against the step's
//! returned `{"atom":..., "result":...}` envelope the same way a caller
//! would type it — including an explicit `result.` prefix.
//!
//! A string that is `$step.path.to.field` verbatim is substituted by the
//! referenced value (which may itself be a JSON object, array, number).
//! Strings that merely contain a `$` somewhere else are left alone.
use serde_json::{Map, Value};
use std::collections::HashMap;
#[derive(Debug, thiserror::Error)]
pub enum ResolveError {
#[error("unknown step in reference `{0}`")]
UnknownStep(String),
#[error("path `{0}` not found in result of step `{1}`")]
MissingPath(String, String),
#[error("path segment `{0}` expects an object/array but got `{1}`")]
WrongKind(String, String),
}
/// Walk `input` recursively, replacing every `$step.path` string with the
/// resolved JSON value from `previous`.
pub fn resolve_input(
input: &Value,
previous: &HashMap<String, Value>,
) -> Result<Value, ResolveError> {
match input {
Value::String(s) => resolve_string(s, previous),
Value::Array(items) => resolve_array(items, previous),
Value::Object(map) => resolve_object(map, previous),
other => Ok(other.clone()),
}
}
fn resolve_string(s: &str, previous: &HashMap<String, Value>) -> Result<Value, ResolveError> {
if let Some(stripped) = s.strip_prefix('$') {
return lookup_reference(stripped, previous);
}
Ok(Value::String(s.to_string()))
}
fn resolve_array(
items: &[Value],
previous: &HashMap<String, Value>,
) -> Result<Value, ResolveError> {
let mut out: Vec<Value> = Vec::with_capacity(items.len());
for v in items {
out.push(resolve_input(v, previous)?);
}
Ok(Value::Array(out))
}
fn resolve_object(
map: &Map<String, Value>,
previous: &HashMap<String, Value>,
) -> Result<Value, ResolveError> {
let mut out = Map::with_capacity(map.len());
for (k, v) in map {
out.insert(k.clone(), resolve_input(v, previous)?);
}
Ok(Value::Object(out))
}
fn lookup_reference(
stripped: &str,
previous: &HashMap<String, Value>,
) -> Result<Value, ResolveError> {
let (step_id, remainder) = split_head(stripped);
let envelope = previous
.get(step_id)
.ok_or_else(|| ResolveError::UnknownStep(step_id.to_string()))?;
walk_path(envelope, remainder, step_id)
}
fn split_head(s: &str) -> (&str, &str) {
match s.find('.') {
Some(i) => (&s[..i], &s[i + 1..]),
None => (s, ""),
}
}
fn walk_path(root: &Value, path: &str, step_id: &str) -> Result<Value, ResolveError> {
if path.is_empty() {
return Ok(root.clone());
}
let mut current = root;
for seg in path.split('.') {
current = descend(current, seg, path, step_id)?;
}
Ok(current.clone())
}
fn descend<'a>(
current: &'a Value,
seg: &str,
path: &str,
step_id: &str,
) -> Result<&'a Value, ResolveError> {
match current {
Value::Object(m) => m
.get(seg)
.ok_or_else(|| ResolveError::MissingPath(path.into(), step_id.into())),
Value::Array(a) => {
let idx: usize = seg
.parse()
.map_err(|_| ResolveError::MissingPath(path.into(), step_id.into()))?;
a.get(idx)
.ok_or_else(|| ResolveError::MissingPath(path.into(), step_id.into()))
}
other => Err(ResolveError::WrongKind(seg.into(), kind_of(other).into())),
}
}
fn kind_of(v: &Value) -> &'static str {
match v {
Value::Null => "null",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
}

View file

@ -0,0 +1,174 @@
//! Smoke tests for the `kei-pipe` DAG runtime.
//!
//! Four scenarios:
//! 1. Happy path — 2 steps with `$step.result.id` substitution.
//! 2. Cycle detection — errors clearly.
//! 3. Unknown dependency — errors clearly.
//! 4. Resolver walks `$step.nested.sub.field` into deep paths.
//!
//! Mock atom: a shell script `mock-atom` that echoes stdin as `result`.
//! The crate name is `mockcrate`, so atom ids look like `mockcrate::echo`.
use kei_pipe::dag::{parse_dag, topo_sort, DagError};
use kei_pipe::resolve::resolve_input;
use kei_pipe::{run_dag, PipeError};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
/// Create a temp dir holding a POSIX shell script `mock-atom` that, for
/// any `run-atom <verb>` invocation, echoes back `{"input": <stdin>}`.
fn mock_bin_dir() -> tempfile::TempDir {
let dir = tempfile::tempdir().expect("tempdir");
let bin = dir.path().join("mockcrate");
// Read stdin verbatim and wrap in {"input": ...}. Python3 is cross-
// platform enough for CI; we avoid jq/node to keep the dep surface
// minimal. Smoke tests skip when python3 is missing.
let script = r#"#!/bin/sh
exec python3 -c 'import sys, json; d = sys.stdin.read(); print(json.dumps({"input": json.loads(d)}))'
"#;
fs::write(&bin, script).unwrap();
let mut perms = fs::metadata(&bin).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&bin, perms).unwrap();
dir
}
fn python3_available() -> bool {
std::process::Command::new("python3")
.arg("-c")
.arg("print(1)")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn write_toml(dir: &std::path::Path, name: &str, body: &str) -> PathBuf {
let path = dir.join(name);
fs::write(&path, body).unwrap();
path
}
#[test]
fn happy_path_runs_two_steps_with_substitution() {
if !python3_available() {
eprintln!("skipping: python3 not available");
return;
}
let bin = mock_bin_dir();
let work = tempfile::tempdir().unwrap();
let dag = r#"
[[steps]]
id = "first"
atom = "mockcrate::echo"
input = { title = "Fix bug" }
[[steps]]
id = "second"
atom = "mockcrate::echo"
depends-on = ["first"]
input = { prior = "$first.result.input.title", literal = 42 }
"#;
let path = write_toml(work.path(), "dag.toml", dag);
std::env::set_var("KEI_RUNTIME_BIN_DIR", bin.path());
let report = run_dag(&path).expect("run");
std::env::remove_var("KEI_RUNTIME_BIN_DIR");
assert!(report.final_ok(), "all steps ok; got {:?}", report.steps);
assert_eq!(report.steps.len(), 2);
assert_eq!(report.steps[0].id, "first");
assert_eq!(report.steps[1].id, "second");
let second_result = report.steps[1]
.result
.as_ref()
.expect("second.result");
// The mock echoes the RESOLVED input, so `prior` should equal "Fix bug"
// after substitution from first.result.input.title.
assert_eq!(
second_result["input"]["prior"],
Value::String("Fix bug".to_string())
);
assert_eq!(second_result["input"]["literal"], json!(42));
}
#[test]
fn cycle_is_detected_with_step_ids() {
let dag = r#"
[[steps]]
id = "a"
atom = "mockcrate::echo"
depends-on = ["b"]
[[steps]]
id = "b"
atom = "mockcrate::echo"
depends-on = ["a"]
"#;
let spec = parse_dag(dag).expect("parse");
let err = topo_sort(&spec).expect_err("cycle");
match err {
DagError::Cycle(ids) => {
assert!(ids.contains('a') && ids.contains('b'), "ids: {ids}");
}
other => panic!("expected Cycle, got {other:?}"),
}
}
#[test]
fn unknown_dependency_errors_with_step_name() {
let dag = r#"
[[steps]]
id = "only"
atom = "mockcrate::echo"
depends-on = ["ghost"]
"#;
let spec = parse_dag(dag).expect("parse");
let err = topo_sort(&spec).expect_err("unknown dep");
match err {
DagError::UnknownDep(step, dep) => {
assert_eq!(step, "only");
assert_eq!(dep, "ghost");
}
other => panic!("expected UnknownDep, got {other:?}"),
}
}
#[test]
fn resolver_walks_nested_paths_and_array_indices() {
// Simulate an envelope produced by a prior step.
let mut prev: HashMap<String, Value> = HashMap::new();
prev.insert(
"first".into(),
json!({
"atom": "mockcrate::echo",
"result": {
"id": 17,
"nested": { "sub": { "field": "deep" } },
"items": ["alpha", "beta", { "tag": "third" }]
}
}),
);
let input = json!({
"direct_id": "$first.result.id",
"deep": "$first.result.nested.sub.field",
"array_index": "$first.result.items.1",
"array_object": "$first.result.items.2.tag",
"root": "$first",
"untouched": "plain string"
});
let out = resolve_input(&input, &prev).expect("resolve");
assert_eq!(out["direct_id"], json!(17));
assert_eq!(out["deep"], json!("deep"));
assert_eq!(out["array_index"], json!("beta"));
assert_eq!(out["array_object"], json!("third"));
assert_eq!(out["root"]["atom"], json!("mockcrate::echo"));
assert_eq!(out["untouched"], json!("plain string"));
}
#[test]
fn run_dag_rejects_unreadable_file() {
let err = run_dag(std::path::Path::new("/no/such/file-a8f3.toml"))
.expect_err("io error");
assert!(matches!(err, PipeError::Read(_, _)), "got {err:?}");
}