The critical missing substrate composition layer. kei-pipe run <dag.toml> — reads DAG spec, topo-sorts atoms, executes sequentially, pipes JSON between steps via $step.path.to.field resolver. 6 Constructor-Pattern cubes: dag/resolve/exec/report/lib/main. 5/5 smoke tests: happy path + cycle detection + unknown dep + nested path resolver + unreadable file. Resolver envelope matches kei-runtime Output — atoms round-trip identically through either runtime. Workspace Cargo.toml: +kei-pipe member. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
91 lines
3.2 KiB
Rust
91 lines
3.2 KiB
Rust
//! `kei-pipe` — atom DAG runtime.
|
|
//!
|
|
//! Reads a TOML DAG spec, topologically sorts the steps, then runs each
|
|
//! atom sequentially. JSON output of a step can be referenced by a later
|
|
//! step via `$step-id.path.to.field` in its input block.
|
|
//!
|
|
//! Atom invocation: spawn `<crate-name> run-atom <verb>` with JSON on
|
|
//! stdin, parse stdout as JSON. Binary resolution honours
|
|
//! `KEI_RUNTIME_BIN_DIR` first, then `PATH` (same contract as
|
|
//! `kei-runtime`).
|
|
//!
|
|
//! Public surface:
|
|
//! - [`dag::DagSpec`] / [`dag::Step`] — parsed DAG structures
|
|
//! - [`dag::parse_dag`] / [`dag::topo_sort`] — DAG pipeline
|
|
//! - [`resolve::resolve_input`] — substitute `$step.path` in input values
|
|
//! - [`exec::run_atom`] — invoke one atom via subprocess
|
|
//! - [`report::DagReport`] / [`report::StepReport`] — run outcome
|
|
//! - [`run_dag`] / [`validate_dag`] — top-level entry points
|
|
|
|
pub mod dag;
|
|
pub mod exec;
|
|
pub mod report;
|
|
pub mod resolve;
|
|
|
|
use std::path::Path;
|
|
|
|
use crate::dag::{parse_dag, topo_sort, DagError};
|
|
use crate::exec::{run_atom, ExecError};
|
|
use crate::report::{DagReport, StepReport};
|
|
use crate::resolve::{resolve_input, ResolveError};
|
|
|
|
/// Top-level errors from running a DAG.
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum PipeError {
|
|
#[error("read {0}: {1}")]
|
|
Read(String, std::io::Error),
|
|
#[error(transparent)]
|
|
Dag(#[from] DagError),
|
|
#[error(transparent)]
|
|
Resolve(#[from] ResolveError),
|
|
#[error(transparent)]
|
|
Exec(#[from] ExecError),
|
|
}
|
|
|
|
/// Parse + topo-sort a DAG file without running any atoms. Returns Ok
|
|
/// with the ordered list of step IDs when the DAG is well-formed.
|
|
pub fn validate_dag(path: &Path) -> Result<Vec<String>, PipeError> {
|
|
let text = std::fs::read_to_string(path)
|
|
.map_err(|e| PipeError::Read(path.display().to_string(), e))?;
|
|
let spec = parse_dag(&text)?;
|
|
let order = topo_sort(&spec)?;
|
|
Ok(order.into_iter().map(|s| s.id.clone()).collect())
|
|
}
|
|
|
|
/// Parse + execute a DAG file. On success returns a full report; on the
|
|
/// first step failure the report still contains every step processed up
|
|
/// to (and including) the failing one, with `ok=false` on that step.
|
|
pub fn run_dag(path: &Path) -> Result<DagReport, PipeError> {
|
|
let text = std::fs::read_to_string(path)
|
|
.map_err(|e| PipeError::Read(path.display().to_string(), e))?;
|
|
let spec = parse_dag(&text)?;
|
|
let ordered = topo_sort(&spec)?;
|
|
Ok(execute_sorted(&ordered))
|
|
}
|
|
|
|
fn execute_sorted(steps: &[&dag::Step]) -> DagReport {
|
|
let mut report = DagReport::new();
|
|
for step in steps {
|
|
match run_one_step(step, &report) {
|
|
Ok(sr) => {
|
|
report.push(sr);
|
|
}
|
|
Err(sr) => {
|
|
report.push(sr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
report
|
|
}
|
|
|
|
fn run_one_step(step: &dag::Step, report: &DagReport) -> Result<StepReport, StepReport> {
|
|
let input_value = match resolve_input(&step.input, report.results()) {
|
|
Ok(v) => v,
|
|
Err(e) => return Err(StepReport::fail(&step.id, &step.atom, format!("resolve: {e}"))),
|
|
};
|
|
match run_atom(&step.atom, &input_value) {
|
|
Ok(result) => Ok(StepReport::ok(&step.id, &step.atom, result)),
|
|
Err(e) => Err(StepReport::fail(&step.id, &step.atom, format!("exec: {e}"))),
|
|
}
|
|
}
|