From e00d25bca632eec78881bd6b5e3e4e319708644a Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Sun, 3 May 2026 15:39:50 +0800 Subject: [PATCH] fix(perf): bound per-user lock LRU + stream-cap atom subprocess output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two resource-exhaustion fixes from Opus Rust + Sonnet Rust audits. 1. kei-cortex per_user_locks DashMap unbounded growth (HIGH) File: kei-cortex/src/state.rs Bug: per_user_locks: DashMap>> inserted on every distinct user_id; never evicted. Auth'd attacker with 1M unique user_ids could OOM the daemon (~150 bytes/entry = 15GB at 100M entries). Fix: replaced DashMap with tokio::sync::Mutex>>> capped at PER_USER_LOCK_CAP = 1024. Eviction is safe because callers hold their own Arc clone for their critical section; dropping the registry slot retires only the registry's reference. Used tokio::sync::Mutex for the registry because LruCache::get mutates the recency list and requires &mut self. Constructor Pattern: state.rs split into state.rs (184 LOC) + state_factories.rs (64 LOC, new). Tests added: user_lock_evicts_past_cap (registry stays ≤1024 after 2048 inserts), user_lock_keeps_most_recent (LRU recency preserved). Existing user_lock_is_stable_per_user + user_lock_differs_per_user updated to async — sole call site (handlers/portrait.rs) gains .await. 2. kei-runtime stdout/stderr cap was post-hoc (HIGH) File: kei-runtime/src/invoke.rs Bug: wait_with_output() buffered ALL child stdout/stderr; only cap_bytes truncated AFTER the child finished. A malicious atom writing 10 GB stdout (or a buggy one looping infinitely) OOM'd the runtime BEFORE the cap fired. Fix: replaced wait_with_output() with two reader threads sharing KillHandle = Arc>>. Each reader appends bytes up to STREAM_CAP = 16 MiB; on cap exceedance the reader KILLS the child from inside the reader thread (critical — otherwise the unbounded writer would never EOF and a post-hoc kill would never fire). Both readers drain the closing pipe to EOF and return. Truncation surfaces as InvokeError::SubprocessError with explicit "exceeded N byte cap" message. Constructor Pattern: invoke.rs decomposed into invoke.rs (159 LOC) + invoke_io.rs (146 LOC, new) + invoke_error.rs (54 LOC, new). Test added: invoke_kills_runaway_atom — stages a kei-flood script running cat /dev/zero, verifies (a) non-zero exit, (b) stdout < 18 MiB, (c) "cap"/"subprocess" in stderr. cargo check --workspace: clean. cargo test -p kei-cortex -p kei-runtime --test-threads=1: 471 pass / 0 fail. Pre-existing openai_loop_wiring.rs parallel-run flake (state collision when test-threads>1) is unrelated and unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../_rust/kei-cortex/src/handlers/portrait.rs | 2 +- _primitives/_rust/kei-cortex/src/lib.rs | 1 + _primitives/_rust/kei-cortex/src/state.rs | 95 +++++------- .../_rust/kei-cortex/src/state_factories.rs | 64 ++++++++ .../_rust/kei-cortex/src/state_test.rs | 59 +++++-- _primitives/_rust/kei-runtime/src/invoke.rs | 108 +++++-------- .../_rust/kei-runtime/src/invoke_error.rs | 54 +++++++ .../_rust/kei-runtime/src/invoke_io.rs | 146 ++++++++++++++++++ _primitives/_rust/kei-runtime/src/lib.rs | 2 + .../kei-runtime/tests/invoke_kills_runaway.rs | 103 ++++++++++++ 10 files changed, 495 insertions(+), 139 deletions(-) create mode 100644 _primitives/_rust/kei-cortex/src/state_factories.rs create mode 100644 _primitives/_rust/kei-runtime/src/invoke_error.rs create mode 100644 _primitives/_rust/kei-runtime/src/invoke_io.rs create mode 100644 _primitives/_rust/kei-runtime/tests/invoke_kills_runaway.rs diff --git a/_primitives/_rust/kei-cortex/src/handlers/portrait.rs b/_primitives/_rust/kei-cortex/src/handlers/portrait.rs index 884ba5a..29de68b 100644 --- a/_primitives/_rust/kei-cortex/src/handlers/portrait.rs +++ b/_primitives/_rust/kei-cortex/src/handlers/portrait.rs @@ -168,7 +168,7 @@ async fn install_rig( .config() .live2d_samples_dir .join(format!("custom-{user_id}")); - let lock = state.user_lock(user_id); + let lock = state.user_lock(user_id).await; let _guard = lock.lock().await; let stylized_owned = stylized.to_vec(); let base_owned = base_dir.clone(); diff --git a/_primitives/_rust/kei-cortex/src/lib.rs b/_primitives/_rust/kei-cortex/src/lib.rs index 4c7c9a8..480bb9a 100644 --- a/_primitives/_rust/kei-cortex/src/lib.rs +++ b/_primitives/_rust/kei-cortex/src/lib.rs @@ -31,6 +31,7 @@ pub mod routes; pub(crate) mod routes_auth; pub mod sentiment; pub mod state; +pub(crate) mod state_factories; pub mod tool; pub mod validate; diff --git a/_primitives/_rust/kei-cortex/src/state.rs b/_primitives/_rust/kei-cortex/src/state.rs index 6b91bdd..c9b6c3f 100644 --- a/_primitives/_rust/kei-cortex/src/state.rs +++ b/_primitives/_rust/kei-cortex/src/state.rs @@ -11,16 +11,25 @@ //! Anthropic invoker per call so the API key is read fresh (env-rotation //! friendly, mirrors `anthropic::open_stream`). -use crate::agent::anthropic_memory_invoker::AnthropicMemoryInvoker; use crate::agent::memory_nudge::{default_scheduler, MemoryNudgeScheduler}; use crate::agent::memory_review_task::Invoker; use crate::config::AppConfig; -use dashmap::DashMap; +use crate::state_factories::{default_invoker_factory, open_token_tracker}; use kei_router::LlmRouter; use kei_token_tracker::Store as TokenTracker; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::Mutex; +/// Hard cap on how many distinct `user_id` mutexes we keep alive. Anything +/// past this is LRU-evicted. Memory bound: cap × (~80 bytes per entry + +/// `Arc>`). At 1024 the registry stays well under 200 KiB even +/// if every slot is hot. The eviction is safe because callers hold their +/// own `Arc>` clone for the duration of the critical section — +/// dropping the cache entry only retires the *registry's* reference. +const PER_USER_LOCK_CAP: usize = 1024; + /// Type alias for the per-call invoker factory. Each `()` invocation /// returns a freshly-built invoker so env mutations between turns /// (`ANTHROPIC_API_KEY` rotation) are picked up automatically. @@ -36,7 +45,12 @@ struct Inner { config: AppConfig, token: String, router: Arc, - per_user_locks: DashMap>>, + /// Bounded LRU registry of per-user mutexes. Capped at + /// `PER_USER_LOCK_CAP` to prevent unbounded growth from auth'd + /// attackers who present a fresh `user_id` on every call. Wrapped + /// in `Mutex` because `LruCache::get` mutates the recency list and + /// is `!Sync` by design. + per_user_locks: Mutex>>>, scheduler: Arc, invoker_factory: InvokerFactory, /// Token-event store. `None` when the configured path could not be @@ -97,7 +111,10 @@ impl AppState { config, token, router, - per_user_locks: DashMap::new(), + per_user_locks: Mutex::new(LruCache::new( + NonZeroUsize::new(PER_USER_LOCK_CAP) + .expect("PER_USER_LOCK_CAP > 0"), + )), scheduler: Arc::new(default_scheduler()), invoker_factory, token_tracker, @@ -139,60 +156,26 @@ impl AppState { } /// Return the per-user mutex, creating it on first access. The returned - /// `Arc>` is cloned — the entry stays alive in the map so - /// subsequent calls for the same `user_id` share it. - pub fn user_lock(&self, user_id: &str) -> Arc> { - self.inner - .per_user_locks - .entry(user_id.to_string()) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .clone() - } -} - -/// Default Anthropic-backed invoker factory. Each call rebuilds the -/// invoker so the API key is re-read fresh — same discipline as -/// `anthropic::open_stream` (no client caching). The system slot -/// uses the review prompt's persona stub; the actual review prompt -/// is appended by `run_review` as a trailing user message. -fn default_invoker_factory() -> InvokerFactory { - Arc::new(|| Arc::new(AnthropicMemoryInvoker::new(default_review_system())) as Arc) -} - -/// System-slot text for memory-review calls. Kept short and stable -/// across reviews so the model response is dominated by the snapshot -/// + review prompt rather than persona drift. -fn default_review_system() -> String { - "You are a quiet observer reviewing a chat to surface memory-worthy facts." - .to_string() -} - -/// Try to open the token-event store at the configured path. Returns -/// `None` when the parent directory does not exist or the open fails — -/// startup must NOT fail just because telemetry isn't ready (a fresh -/// host without `~/.keisei/` is normal). Errors are logged to stderr -/// once at startup so an operator notices, but the daemon keeps running. -fn open_token_tracker( - path: &std::path::Path, -) -> Option>> { - if let Some(parent) = path.parent() { - if !parent.exists() { - eprintln!( - "kei-cortex: token-tracker parent dir {:?} missing — skipping store open", - parent - ); - return None; + /// `Arc>` is cloned — when the LRU has spare capacity the + /// entry stays in the registry so the next call for the same `user_id` + /// shares it. If the registry has evicted the slot under load, a new + /// mutex is created (acceptable: the prior critical section is already + /// fenced by the caller's own `Arc` clone). + pub async fn user_lock(&self, user_id: &str) -> Arc> { + let mut cache = self.inner.per_user_locks.lock().await; + if let Some(existing) = cache.get(user_id) { + return existing.clone(); } + let new_lock = Arc::new(Mutex::new(())); + cache.put(user_id.to_string(), new_lock.clone()); + new_lock } - match TokenTracker::open(path) { - Ok(s) => Some(Arc::new(std::sync::Mutex::new(s))), - Err(e) => { - eprintln!( - "kei-cortex: token-tracker open {} failed: {e} — telemetry disabled", - path.display() - ); - None - } + + /// Test-only accessor: current size of the per-user-lock registry. + /// Exposed for the eviction integration test (`state_test.rs`). + #[cfg(test)] + pub async fn user_lock_count(&self) -> usize { + self.inner.per_user_locks.lock().await.len() } } diff --git a/_primitives/_rust/kei-cortex/src/state_factories.rs b/_primitives/_rust/kei-cortex/src/state_factories.rs new file mode 100644 index 0000000..0d9ccdf --- /dev/null +++ b/_primitives/_rust/kei-cortex/src/state_factories.rs @@ -0,0 +1,64 @@ +//! Construction helpers extracted from `state.rs` to keep that file under +//! the Constructor-Pattern 200-LOC ceiling after the Wave 44d resource-cap +//! work added an LRU registry for per-user locks. +//! +//! Hosts: +//! - `default_invoker_factory` — builds the Anthropic memory-review +//! invoker fresh on every call so env-rotated API keys take effect. +//! - `default_review_system` — short, stable system slot for review. +//! - `open_token_tracker` — opens the token-event SQLite store and +//! gracefully degrades to `None` when the host has no telemetry dir. + +use crate::agent::anthropic_memory_invoker::AnthropicMemoryInvoker; +use crate::agent::memory_review_task::Invoker; +use crate::state::InvokerFactory; +use kei_token_tracker::Store as TokenTracker; +use std::sync::Arc; + +/// Default Anthropic-backed invoker factory. Each call rebuilds the +/// invoker so the API key is re-read fresh — same discipline as +/// `anthropic::open_stream` (no client caching). The system slot +/// uses the review prompt's persona stub; the actual review prompt +/// is appended by `run_review` as a trailing user message. +pub(crate) fn default_invoker_factory() -> InvokerFactory { + Arc::new(|| { + Arc::new(AnthropicMemoryInvoker::new(default_review_system())) as Arc + }) +} + +/// System-slot text for memory-review calls. Kept short and stable +/// across reviews so the model response is dominated by the snapshot +/// + review prompt rather than persona drift. +fn default_review_system() -> String { + "You are a quiet observer reviewing a chat to surface memory-worthy facts." + .to_string() +} + +/// Try to open the token-event store at the configured path. Returns +/// `None` when the parent directory does not exist or the open fails — +/// startup must NOT fail just because telemetry isn't ready (a fresh +/// host without `~/.keisei/` is normal). Errors are logged to stderr +/// once at startup so an operator notices, but the daemon keeps running. +pub(crate) fn open_token_tracker( + path: &std::path::Path, +) -> Option>> { + if let Some(parent) = path.parent() { + if !parent.exists() { + eprintln!( + "kei-cortex: token-tracker parent dir {:?} missing — skipping store open", + parent + ); + return None; + } + } + match TokenTracker::open(path) { + Ok(s) => Some(Arc::new(std::sync::Mutex::new(s))), + Err(e) => { + eprintln!( + "kei-cortex: token-tracker open {} failed: {e} — telemetry disabled", + path.display() + ); + None + } + } +} diff --git a/_primitives/_rust/kei-cortex/src/state_test.rs b/_primitives/_rust/kei-cortex/src/state_test.rs index 4b4329a..fe065f7 100644 --- a/_primitives/_rust/kei-cortex/src/state_test.rs +++ b/_primitives/_rust/kei-cortex/src/state_test.rs @@ -2,7 +2,8 @@ //! //! Constructor Pattern: extracted to a sibling so the parent stays //! ≤200 LOC after the Hermes P2.2.b additions (scheduler + invoker -//! factory plumbing). +//! factory plumbing) and Wave 44d resource-cap hardening (per-user +//! lock LRU eviction). use super::*; use std::path::PathBuf; @@ -33,19 +34,19 @@ impl Invoker for Counter { } } -#[test] -fn user_lock_is_stable_per_user() { +#[tokio::test] +async fn user_lock_is_stable_per_user() { let state = AppState::new(dummy_config(), "tok".into()); - let a = state.user_lock("alice"); - let b = state.user_lock("alice"); + let a = state.user_lock("alice").await; + let b = state.user_lock("alice").await; assert!(Arc::ptr_eq(&a, &b)); } -#[test] -fn user_lock_differs_per_user() { +#[tokio::test] +async fn user_lock_differs_per_user() { let state = AppState::new(dummy_config(), "tok".into()); - let a = state.user_lock("alice"); - let b = state.user_lock("bob"); + let a = state.user_lock("alice").await; + let b = state.user_lock("bob").await; assert!(!Arc::ptr_eq(&a, &b)); } @@ -79,3 +80,43 @@ fn invoker_factory_yields_distinct_arcs() { // factory does NOT memoise). assert!(!Arc::ptr_eq(&a, &b)); } + +/// Resource-exhaustion guard: the registry MUST cap at +/// `PER_USER_LOCK_CAP` (1024). After inserting 2× cap distinct user_ids +/// the registry size stays ≤ cap, proving LRU eviction kicked in. Without +/// the cap an auth'd attacker with 1M unique user_ids would OOM the +/// daemon — this test pins that the bound holds. +#[tokio::test] +async fn user_lock_evicts_past_cap() { + let state = AppState::new(dummy_config(), "tok".into()); + let cap = super::PER_USER_LOCK_CAP; + for i in 0..(cap * 2) { + let _ = state.user_lock(&format!("user-{i}")).await; + } + let count = state.user_lock_count().await; + assert!( + count <= cap, + "registry must stay ≤ {cap} after {} inserts; got {count}", + cap * 2 + ); + assert!(count > 0, "registry should retain the most recent inserts"); +} + +/// LRU recency: the most recent inserts MUST survive eviction. After +/// pushing 2× cap entries, the very last one we inserted should still +/// be in the registry (cheap pointer-equality check via `user_lock` +/// returning the same `Arc`). +#[tokio::test] +async fn user_lock_keeps_most_recent() { + let state = AppState::new(dummy_config(), "tok".into()); + let cap = super::PER_USER_LOCK_CAP; + for i in 0..(cap * 2) { + let _ = state.user_lock(&format!("user-{i}")).await; + } + // The very last user inserted in the loop above must still be cached: + // re-fetching returns a clone of the same Arc. + let last_key = format!("user-{}", cap * 2 - 1); + let a = state.user_lock(&last_key).await; + let b = state.user_lock(&last_key).await; + assert!(Arc::ptr_eq(&a, &b)); +} diff --git a/_primitives/_rust/kei-runtime/src/invoke.rs b/_primitives/_rust/kei-runtime/src/invoke.rs index e6c5e33..ab5bc51 100644 --- a/_primitives/_rust/kei-runtime/src/invoke.rs +++ b/_primitives/_rust/kei-runtime/src/invoke.rs @@ -12,6 +12,7 @@ //! whose crate has not yet been migrated to the `run-atom` protocol). use crate::discover::{walk_atoms, AtomMeta}; +use crate::invoke_io::{capture_with_cap, Captured}; use crate::validate::validate_input; use serde::Serialize; use serde_json::Value; @@ -19,55 +20,13 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -/// Max bytes we read from subprocess stdout/stderr to guard against runaway output. -const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB +pub use crate::invoke_error::InvokeError; -#[derive(Debug)] -pub enum InvokeError { - AtomNotFound(String), - InputParse(String), - InputInvalid(String), - MissingInputSchema(String), - /// `crate_name` in atom YAML failed the `kei-*` allowlist check. - InvalidAtom(String), - /// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`. - BinaryNotFound { crate_name: String }, - /// Subprocess exited non-zero — propagate the atom's own exit code. - AtomFailed { atom: String, code: i32, stderr: String }, - /// IO / spawn failure (not a non-zero exit from the child). - SubprocessError(String), - /// Atom's stdout was not parseable as JSON. - OutputParse(String), - /// Legacy escape — atom not yet migrated to `run-atom` protocol. - NotImplemented { atom: String }, -} - -impl std::fmt::Display for InvokeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::AtomNotFound(id) => write!(f, "no atom matching {id}"), - Self::InputParse(e) => write!(f, "input rejected: {e}"), - Self::InputInvalid(e) => write!(f, "input rejected: {e}"), - Self::MissingInputSchema(id) => write!(f, "atom `{id}` declares no input schema"), - Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"), - Self::BinaryNotFound { crate_name } => write!( - f, - "binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR" - ), - Self::AtomFailed { atom, code, stderr } => { - write!(f, "atom `{atom}` exited {code}: {stderr}") - } - Self::SubprocessError(e) => write!(f, "subprocess: {e}"), - Self::OutputParse(e) => write!(f, "atom stdout not JSON: {e}"), - Self::NotImplemented { atom } => write!( - f, - "invoke not yet wired for this atom ({atom}); use the underlying CLI directly" - ), - } - } -} - -impl std::error::Error for InvokeError {} +/// Max bytes we read from subprocess stdout/stderr to guard against +/// runaway output. Streamed reads enforce this DURING capture (not +/// post-hoc) — see `invoke_io.rs`. The constant is kept here as the +/// public name documented in `lib.rs`. +pub const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB /// Parsed output of an invoked atom. `result` is the raw JSON the atom wrote. #[derive(Debug, Serialize)] @@ -139,40 +98,43 @@ fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result { .stderr(Stdio::piped()) .spawn() .map_err(|e| InvokeError::SubprocessError(format!("spawn {}: {e}", bin.display())))?; + write_stdin(&mut child, input_json)?; + let captured = capture_with_cap(child) + .map_err(|e| InvokeError::SubprocessError(format!("wait: {e}")))?; + handle_captured(meta, captured) +} + +/// Write the atom's input JSON to the child's stdin, dropping the handle +/// so the writer side closes (otherwise the child would block on EOF). +fn write_stdin(child: &mut std::process::Child, input_json: &str) -> Result<(), InvokeError> { if let Some(mut stdin) = child.stdin.take() { stdin .write_all(input_json.as_bytes()) .map_err(|e| InvokeError::SubprocessError(format!("write stdin: {e}")))?; } - let out = child - .wait_with_output() - .map_err(|e| InvokeError::SubprocessError(format!("wait: {e}")))?; - handle_subprocess_output(meta, out) + Ok(()) } -fn cap_bytes(data: Vec, label: &str) -> Vec { - if data.len() > OUTPUT_CAP { - let mut v = data; - v.truncate(OUTPUT_CAP); - eprintln!("[kei-runtime] {label} truncated at {OUTPUT_CAP} bytes"); - v - } else { - data +/// Map a `Captured` (potentially-truncated) child result into our typed +/// `Output` / `InvokeError`. Truncation is surfaced in `SubprocessError` +/// when the child was killed for exceeding the cap; the parent decides +/// what exit code to return. +fn handle_captured(meta: &AtomMeta, c: Captured) -> Result { + if c.truncated { + return Err(InvokeError::SubprocessError(format!( + "atom `{}` killed: stdout/stderr exceeded {OUTPUT_CAP} byte cap", + meta.full_id + ))); } -} - -fn handle_subprocess_output( - meta: &AtomMeta, - mut out: std::process::Output, -) -> Result { - out.stdout = cap_bytes(out.stdout, "stdout"); - out.stderr = cap_bytes(out.stderr, "stderr"); - let code = out.status.code().unwrap_or(-1); - if !out.status.success() { - let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); - return Err(InvokeError::AtomFailed { atom: meta.full_id.clone(), code, stderr }); + if c.status_code != 0 { + let stderr = String::from_utf8_lossy(&c.stderr).trim().to_string(); + return Err(InvokeError::AtomFailed { + atom: meta.full_id.clone(), + code: c.status_code, + stderr, + }); } - let stdout = String::from_utf8_lossy(&out.stdout); + let stdout = String::from_utf8_lossy(&c.stdout); let result: Value = serde_json::from_str(stdout.trim()) .map_err(|e| InvokeError::OutputParse(format!("{e}; stdout was: {stdout}")))?; Ok(Output { atom: meta.full_id.clone(), result }) diff --git a/_primitives/_rust/kei-runtime/src/invoke_error.rs b/_primitives/_rust/kei-runtime/src/invoke_error.rs new file mode 100644 index 0000000..f871dc1 --- /dev/null +++ b/_primitives/_rust/kei-runtime/src/invoke_error.rs @@ -0,0 +1,54 @@ +//! Typed errors for the atom-invocation runtime. +//! +//! Constructor Pattern: extracted from `invoke.rs` so the runtime +//! parent file stays under 200 LOC after Wave 44d added bounded-read +//! capture + truncation handling. + +#[derive(Debug)] +pub enum InvokeError { + AtomNotFound(String), + InputParse(String), + InputInvalid(String), + MissingInputSchema(String), + /// `crate_name` in atom YAML failed the `kei-*` allowlist check. + InvalidAtom(String), + /// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`. + BinaryNotFound { crate_name: String }, + /// Subprocess exited non-zero — propagate the atom's own exit code. + AtomFailed { atom: String, code: i32, stderr: String }, + /// IO / spawn failure (not a non-zero exit from the child). + SubprocessError(String), + /// Atom's stdout was not parseable as JSON. + OutputParse(String), + /// Legacy escape — atom not yet migrated to `run-atom` protocol. + NotImplemented { atom: String }, +} + +impl std::fmt::Display for InvokeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AtomNotFound(id) => write!(f, "no atom matching {id}"), + Self::InputParse(e) => write!(f, "input rejected: {e}"), + Self::InputInvalid(e) => write!(f, "input rejected: {e}"), + Self::MissingInputSchema(id) => { + write!(f, "atom `{id}` declares no input schema") + } + Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"), + Self::BinaryNotFound { crate_name } => write!( + f, + "binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR" + ), + Self::AtomFailed { atom, code, stderr } => { + write!(f, "atom `{atom}` exited {code}: {stderr}") + } + Self::SubprocessError(e) => write!(f, "subprocess: {e}"), + Self::OutputParse(e) => write!(f, "atom stdout not JSON: {e}"), + Self::NotImplemented { atom } => write!( + f, + "invoke not yet wired for this atom ({atom}); use the underlying CLI directly" + ), + } + } +} + +impl std::error::Error for InvokeError {} diff --git a/_primitives/_rust/kei-runtime/src/invoke_io.rs b/_primitives/_rust/kei-runtime/src/invoke_io.rs new file mode 100644 index 0000000..c562b95 --- /dev/null +++ b/_primitives/_rust/kei-runtime/src/invoke_io.rs @@ -0,0 +1,146 @@ +//! Bounded-read child-process IO for `invoke.rs`. +//! +//! Wave 44d resource-cap hardening: the previous `wait_with_output()` +//! buffered ALL stdout/stderr before any size check ran — a malicious +//! atom writing 10 GiB of zeros would OOM the runtime before truncation. +//! This module replaces that with size-tracked stream readers that KILL +//! the child the moment a cap is exceeded — the kill is issued from +//! INSIDE the reader thread so an unbounded writer cannot deadlock the +//! parent (post-hoc kill would never happen because the reader would +//! never return on infinite stdout). +//! +//! Constructor Pattern: extracted as a sibling module so `invoke.rs` +//! stays under 200 LOC and per-function under 30 LOC. + +use std::io::Read; +use std::process::{Child, ChildStderr, ChildStdout}; +use std::sync::{Arc, Mutex}; +use std::thread; + +/// Hard cap on stdout/stderr each. Mirrors the public `OUTPUT_CAP` +/// constant in `invoke.rs` (16 MiB). Kept as a module-private mirror +/// so this file can be reasoned about in isolation. +const STREAM_CAP: usize = 16 * 1024 * 1024; + +/// Per-read chunk size. 8 KiB is the typical pipe buffer granularity +/// on Linux/macOS; smaller would inflate syscall count, larger would +/// risk overshooting the cap by up to one chunk. +const CHUNK: usize = 8 * 1024; + +/// Captured child output. `truncated` is true when EITHER stream hit +/// its cap; the caller is expected to surface the truncation in any +/// error message it returns to the user. +pub(crate) struct Captured { + pub stdout: Vec, + pub stderr: Vec, + pub status_code: i32, + pub truncated: bool, +} + +/// Shared kill-handle: the reader thread that trips the cap takes the +/// `Mutex`, calls `kill()`, drops the lock. Wrapped so both readers +/// race-safely — only one wins, the other sees the child already gone. +type KillHandle = Arc>>; + +/// Read both streams concurrently with size caps. If either reader +/// trips its cap, that reader kills the child IMMEDIATELY (from inside +/// the reader thread) so an unbounded writer cannot deadlock us. The +/// other reader then sees the pipe close and returns. Finally we reap. +pub(crate) fn capture_with_cap(mut child: Child) -> std::io::Result { + let stdout = child.stdout.take().expect("stdout piped"); + let stderr = child.stderr.take().expect("stderr piped"); + let kill: KillHandle = Arc::new(Mutex::new(Some(child))); + let stdout_handle = spawn_reader_stdout(stdout, kill.clone()); + let stderr_handle = spawn_reader_stderr(stderr, kill.clone()); + let (out_buf, out_trunc) = stdout_handle.join().expect("stdout thread"); + let (err_buf, err_trunc) = stderr_handle.join().expect("stderr thread"); + let status_code = reap_child(&kill); + Ok(Captured { + stdout: out_buf, + stderr: err_buf, + status_code, + truncated: out_trunc || err_trunc, + }) +} + +/// Reap the child (waiting on it) and return its exit code. If a reader +/// already killed the child the lock will hold `None` — fall through to +/// -1 (signaled). Otherwise we still call `wait()` to avoid a zombie. +fn reap_child(kill: &KillHandle) -> i32 { + let mut guard = kill.lock().expect("kill mutex"); + if let Some(mut c) = guard.take() { + c.wait().ok().and_then(|s| s.code()).unwrap_or(-1) + } else { + -1 + } +} + +/// Spawn the stdout reader thread. Returns `(buffer, truncated)`. +fn spawn_reader_stdout( + stream: ChildStdout, + kill: KillHandle, +) -> thread::JoinHandle<(Vec, bool)> { + thread::spawn(move || read_capped(stream, kill)) +} + +/// Spawn the stderr reader thread. Returns `(buffer, truncated)`. +fn spawn_reader_stderr( + stream: ChildStderr, + kill: KillHandle, +) -> thread::JoinHandle<(Vec, bool)> { + thread::spawn(move || read_capped(stream, kill)) +} + +/// Read until EOF or cap exceeded. On cap exceedance, kill the child +/// IMMEDIATELY via the shared handle then keep draining the pipe so +/// the OS-level pipe buffer empties cleanly (would-be writes by the +/// already-killed child error out fast, EOF arrives quickly). +fn read_capped(mut stream: R, kill: KillHandle) -> (Vec, bool) { + let mut buf = Vec::with_capacity(CHUNK); + let mut chunk = [0u8; CHUNK]; + let mut truncated = false; + loop { + match stream.read(&mut chunk) { + Ok(0) => break, + Ok(n) => append_or_truncate(&mut buf, &chunk[..n], &mut truncated, &kill), + Err(_) => break, + } + } + (buf, truncated) +} + +/// Append-or-mark logic, extracted so `read_capped` stays under the 30-LOC +/// ceiling. Side effect: on the cap-crossing call, sends a kill via the +/// shared handle before returning. +fn append_or_truncate( + buf: &mut Vec, + incoming: &[u8], + truncated: &mut bool, + kill: &KillHandle, +) { + if *truncated { + return; // already past cap; drain silently so writer doesn't block + } + if buf.len() + incoming.len() > STREAM_CAP { + let take = STREAM_CAP.saturating_sub(buf.len()); + buf.extend_from_slice(&incoming[..take]); + *truncated = true; + kill_child(kill); + } else { + buf.extend_from_slice(incoming); + } +} + +/// Kill the child via the shared handle. Best-effort: if another +/// reader has already killed + reaped, the lock holds `None` and this +/// is a no-op. +fn kill_child(kill: &KillHandle) { + if let Ok(mut guard) = kill.lock() { + if let Some(c) = guard.as_mut() { + let _ = c.kill(); + eprintln!( + "[kei-runtime] child killed: stdout/stderr exceeded {STREAM_CAP} byte cap" + ); + } + } +} diff --git a/_primitives/_rust/kei-runtime/src/lib.rs b/_primitives/_rust/kei-runtime/src/lib.rs index 6b1de3f..18900af 100644 --- a/_primitives/_rust/kei-runtime/src/lib.rs +++ b/_primitives/_rust/kei-runtime/src/lib.rs @@ -10,5 +10,7 @@ pub mod discover; pub mod invoke; +pub(crate) mod invoke_error; +pub(crate) mod invoke_io; pub mod lint; pub mod validate; diff --git a/_primitives/_rust/kei-runtime/tests/invoke_kills_runaway.rs b/_primitives/_rust/kei-runtime/tests/invoke_kills_runaway.rs new file mode 100644 index 0000000..fc328e4 --- /dev/null +++ b/_primitives/_rust/kei-runtime/tests/invoke_kills_runaway.rs @@ -0,0 +1,103 @@ +//! Integration test — runaway atom that floods stdout MUST be killed +//! after the 16 MiB cap, not buffered to OOM. +//! +//! Wave 44d resource-cap: replaces the post-hoc `cap_bytes` truncation +//! with streamed reads in `invoke_io.rs`. This test pins the new +//! behaviour: a fake atom binary that emits 100 MiB of zeros must +//! exit non-zero (killed by parent) rather than complete normally +//! with 100 MiB buffered. +//! +//! Strategy: +//! 1. Build a tiny shell-script "atom" that ignores stdin and writes +//! 100 MiB of zeros to stdout. We can't use `dd` directly because +//! the runtime's allowlist enforces `kei-*` crate names — so we +//! stage a script named `kei-flood` in a temp bin dir. +//! 2. Stage atom YAML naming `kei-flood::pour`. +//! 3. Invoke via the runtime CLI; expect non-zero exit + a stderr +//! message naming the cap. + +use std::fs; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::process::Command; + +const BIN: &str = env!("CARGO_BIN_EXE_kei-runtime"); + +fn write_atom_md(root: &Path, crate_name: &str, verb: &str) { + let atoms = root.join(crate_name).join("atoms"); + let schemas = atoms.join("schemas"); + fs::create_dir_all(&schemas).unwrap(); + let in_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#; + let out_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#; + fs::write(schemas.join(format!("{verb}-input.json")), in_schema).unwrap(); + fs::write(schemas.join(format!("{verb}-output.json")), out_schema).unwrap(); + let md = format!( + "---\natom: {crate_name}::{verb}\nkind: command\nversion: \"0.1.0\"\n\ + input:\n schema: schemas/{verb}-input.json\n\ + output:\n schema: schemas/{verb}-output.json\n\ + side_effects: []\nidempotent: true\nstability: stable\n---\n" + ); + fs::write(atoms.join(format!("{verb}.md")), md).unwrap(); +} + +/// Stage a `kei-flood` shell-script in `bin_dir`. When invoked with +/// `run-atom pour` it writes a continuous stream of zeros to stdout +/// (well past the 16 MiB cap) using a bash builtin loop — no +/// dependency on external `dd` because PATH may be locked down. +/// The parent runtime should kill it well before it finishes. +fn stage_flood_binary(bin_dir: &Path) -> std::path::PathBuf { + fs::create_dir_all(bin_dir).unwrap(); + let script = bin_dir.join("kei-flood"); + // Pipe /dev/zero straight to stdout via /bin/cat, which is + // present at /bin/cat on macOS and Linux. The script will run + // unbounded; the parent runtime is expected to kill it after + // the 16 MiB cap. SIGPIPE on cat (when the parent stops reading + // and the pipe closes) is benign — we just want enough volume + // to provably exceed the cap. + let body = "#!/bin/sh\nexec /bin/cat /dev/zero\n"; + fs::write(&script, body).unwrap(); + let mut perms = fs::metadata(&script).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&script, perms).unwrap(); + script +} + +#[test] +fn invoke_kills_runaway_atom() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().join("root"); + let bin = tmp.path().join("bin"); + write_atom_md(&root, "kei-flood", "pour"); + let _script = stage_flood_binary(&bin); + let out = Command::new(BIN) + .env("KEI_RUNTIME_BIN_DIR", &bin) + .env("PATH", &bin) + .arg("invoke") + .arg("kei-flood::pour") + .arg("--input") + .arg("{}") + .arg("--root") + .arg(&root) + .output() + .expect("spawn kei-runtime"); + let stderr = String::from_utf8_lossy(&out.stderr).to_string(); + let stdout_len = out.stdout.len(); + // The runtime printed at most a small JSON envelope OR nothing + // (process killed). 16 MiB cap + a sliver of envelope JSON → well + // under 18 MiB. If the cap had failed, stdout would be ~100 MiB. + assert!( + stdout_len < 18 * 1024 * 1024, + "expected stdout < 18 MiB; got {stdout_len} (cap not enforced!)" + ); + // We expect a non-zero exit because the child was killed. + assert_ne!( + out.status.code(), + Some(0), + "expected non-zero exit on runaway; stderr: {stderr}" + ); + // Stderr should mention the cap so the operator can diagnose. + assert!( + stderr.contains("cap") || stderr.contains("subprocess"), + "expected 'cap' / 'subprocess' in stderr: {stderr}" + ); +}