fix(perf): bound per-user lock LRU + stream-cap atom subprocess output

Two resource-exhaustion fixes from Opus Rust + Sonnet Rust audits.

1. kei-cortex per_user_locks DashMap unbounded growth (HIGH)
   File: kei-cortex/src/state.rs
   Bug: per_user_locks: DashMap<String, Arc<Mutex<()>>> inserted on every
   distinct user_id; never evicted. Auth'd attacker with 1M unique user_ids
   could OOM the daemon (~150 bytes/entry = 15GB at 100M entries).

   Fix: replaced DashMap with tokio::sync::Mutex<LruCache<String,
   Arc<TokioMutex<()>>>> capped at PER_USER_LOCK_CAP = 1024. Eviction is
   safe because callers hold their own Arc clone for their critical section;
   dropping the registry slot retires only the registry's reference. Used
   tokio::sync::Mutex for the registry because LruCache::get mutates the
   recency list and requires &mut self.

   Constructor Pattern: state.rs split into state.rs (184 LOC) +
   state_factories.rs (64 LOC, new). Tests added: user_lock_evicts_past_cap
   (registry stays ≤1024 after 2048 inserts), user_lock_keeps_most_recent
   (LRU recency preserved). Existing user_lock_is_stable_per_user +
   user_lock_differs_per_user updated to async — sole call site
   (handlers/portrait.rs) gains .await.

2. kei-runtime stdout/stderr cap was post-hoc (HIGH)
   File: kei-runtime/src/invoke.rs
   Bug: wait_with_output() buffered ALL child stdout/stderr; only cap_bytes
   truncated AFTER the child finished. A malicious atom writing 10 GB stdout
   (or a buggy one looping infinitely) OOM'd the runtime BEFORE the cap fired.

   Fix: replaced wait_with_output() with two reader threads sharing
   KillHandle = Arc<Mutex<Option<Child>>>. Each reader appends bytes up to
   STREAM_CAP = 16 MiB; on cap exceedance the reader KILLS the child from
   inside the reader thread (critical — otherwise the unbounded writer would
   never EOF and a post-hoc kill would never fire). Both readers drain the
   closing pipe to EOF and return. Truncation surfaces as
   InvokeError::SubprocessError with explicit "exceeded N byte cap" message.

   Constructor Pattern: invoke.rs decomposed into invoke.rs (159 LOC) +
   invoke_io.rs (146 LOC, new) + invoke_error.rs (54 LOC, new). Test added:
   invoke_kills_runaway_atom — stages a kei-flood script running cat
   /dev/zero, verifies (a) non-zero exit, (b) stdout < 18 MiB, (c)
   "cap"/"subprocess" in stderr.

cargo check --workspace: clean. cargo test -p kei-cortex -p kei-runtime
--test-threads=1: 471 pass / 0 fail. Pre-existing openai_loop_wiring.rs
parallel-run flake (state collision when test-threads>1) is unrelated and
unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-05-03 15:39:50 +08:00
parent 06ff2f8ed4
commit 4e99057d2b
10 changed files with 495 additions and 139 deletions

View file

@ -168,7 +168,7 @@ async fn install_rig(
.config() .config()
.live2d_samples_dir .live2d_samples_dir
.join(format!("custom-{user_id}")); .join(format!("custom-{user_id}"));
let lock = state.user_lock(user_id); let lock = state.user_lock(user_id).await;
let _guard = lock.lock().await; let _guard = lock.lock().await;
let stylized_owned = stylized.to_vec(); let stylized_owned = stylized.to_vec();
let base_owned = base_dir.clone(); let base_owned = base_dir.clone();

View file

@ -31,6 +31,7 @@ pub mod routes;
pub(crate) mod routes_auth; pub(crate) mod routes_auth;
pub mod sentiment; pub mod sentiment;
pub mod state; pub mod state;
pub(crate) mod state_factories;
pub mod tool; pub mod tool;
pub mod validate; pub mod validate;

View file

@ -11,16 +11,25 @@
//! Anthropic invoker per call so the API key is read fresh (env-rotation //! Anthropic invoker per call so the API key is read fresh (env-rotation
//! friendly, mirrors `anthropic::open_stream`). //! friendly, mirrors `anthropic::open_stream`).
use crate::agent::anthropic_memory_invoker::AnthropicMemoryInvoker;
use crate::agent::memory_nudge::{default_scheduler, MemoryNudgeScheduler}; use crate::agent::memory_nudge::{default_scheduler, MemoryNudgeScheduler};
use crate::agent::memory_review_task::Invoker; use crate::agent::memory_review_task::Invoker;
use crate::config::AppConfig; use crate::config::AppConfig;
use dashmap::DashMap; use crate::state_factories::{default_invoker_factory, open_token_tracker};
use kei_router::LlmRouter; use kei_router::LlmRouter;
use kei_token_tracker::Store as TokenTracker; use kei_token_tracker::Store as TokenTracker;
use lru::LruCache;
use std::num::NonZeroUsize;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
/// Hard cap on how many distinct `user_id` mutexes we keep alive. Anything
/// past this is LRU-evicted. Memory bound: cap × (~80 bytes per entry +
/// `Arc<Mutex<()>>`). At 1024 the registry stays well under 200 KiB even
/// if every slot is hot. The eviction is safe because callers hold their
/// own `Arc<Mutex<()>>` clone for the duration of the critical section —
/// dropping the cache entry only retires the *registry's* reference.
const PER_USER_LOCK_CAP: usize = 1024;
/// Type alias for the per-call invoker factory. Each `()` invocation /// Type alias for the per-call invoker factory. Each `()` invocation
/// returns a freshly-built invoker so env mutations between turns /// returns a freshly-built invoker so env mutations between turns
/// (`ANTHROPIC_API_KEY` rotation) are picked up automatically. /// (`ANTHROPIC_API_KEY` rotation) are picked up automatically.
@ -36,7 +45,12 @@ struct Inner {
config: AppConfig, config: AppConfig,
token: String, token: String,
router: Arc<LlmRouter>, router: Arc<LlmRouter>,
per_user_locks: DashMap<String, Arc<Mutex<()>>>, /// Bounded LRU registry of per-user mutexes. Capped at
/// `PER_USER_LOCK_CAP` to prevent unbounded growth from auth'd
/// attackers who present a fresh `user_id` on every call. Wrapped
/// in `Mutex` because `LruCache::get` mutates the recency list and
/// is `!Sync` by design.
per_user_locks: Mutex<LruCache<String, Arc<Mutex<()>>>>,
scheduler: Arc<MemoryNudgeScheduler>, scheduler: Arc<MemoryNudgeScheduler>,
invoker_factory: InvokerFactory, invoker_factory: InvokerFactory,
/// Token-event store. `None` when the configured path could not be /// Token-event store. `None` when the configured path could not be
@ -97,7 +111,10 @@ impl AppState {
config, config,
token, token,
router, router,
per_user_locks: DashMap::new(), per_user_locks: Mutex::new(LruCache::new(
NonZeroUsize::new(PER_USER_LOCK_CAP)
.expect("PER_USER_LOCK_CAP > 0"),
)),
scheduler: Arc::new(default_scheduler()), scheduler: Arc::new(default_scheduler()),
invoker_factory, invoker_factory,
token_tracker, token_tracker,
@ -139,60 +156,26 @@ impl AppState {
} }
/// Return the per-user mutex, creating it on first access. The returned /// Return the per-user mutex, creating it on first access. The returned
/// `Arc<Mutex<()>>` is cloned — the entry stays alive in the map so /// `Arc<Mutex<()>>` is cloned — when the LRU has spare capacity the
/// subsequent calls for the same `user_id` share it. /// entry stays in the registry so the next call for the same `user_id`
pub fn user_lock(&self, user_id: &str) -> Arc<Mutex<()>> { /// shares it. If the registry has evicted the slot under load, a new
self.inner /// mutex is created (acceptable: the prior critical section is already
.per_user_locks /// fenced by the caller's own `Arc` clone).
.entry(user_id.to_string()) pub async fn user_lock(&self, user_id: &str) -> Arc<Mutex<()>> {
.or_insert_with(|| Arc::new(Mutex::new(()))) let mut cache = self.inner.per_user_locks.lock().await;
.clone() if let Some(existing) = cache.get(user_id) {
} return existing.clone();
}
/// Default Anthropic-backed invoker factory. Each call rebuilds the
/// invoker so the API key is re-read fresh — same discipline as
/// `anthropic::open_stream` (no client caching). The system slot
/// uses the review prompt's persona stub; the actual review prompt
/// is appended by `run_review` as a trailing user message.
fn default_invoker_factory() -> InvokerFactory {
Arc::new(|| Arc::new(AnthropicMemoryInvoker::new(default_review_system())) as Arc<dyn Invoker>)
}
/// System-slot text for memory-review calls. Kept short and stable
/// across reviews so the model response is dominated by the snapshot
/// + review prompt rather than persona drift.
fn default_review_system() -> String {
"You are a quiet observer reviewing a chat to surface memory-worthy facts."
.to_string()
}
/// Try to open the token-event store at the configured path. Returns
/// `None` when the parent directory does not exist or the open fails —
/// startup must NOT fail just because telemetry isn't ready (a fresh
/// host without `~/.keisei/` is normal). Errors are logged to stderr
/// once at startup so an operator notices, but the daemon keeps running.
fn open_token_tracker(
path: &std::path::Path,
) -> Option<Arc<std::sync::Mutex<TokenTracker>>> {
if let Some(parent) = path.parent() {
if !parent.exists() {
eprintln!(
"kei-cortex: token-tracker parent dir {:?} missing — skipping store open",
parent
);
return None;
} }
let new_lock = Arc::new(Mutex::new(()));
cache.put(user_id.to_string(), new_lock.clone());
new_lock
} }
match TokenTracker::open(path) {
Ok(s) => Some(Arc::new(std::sync::Mutex::new(s))), /// Test-only accessor: current size of the per-user-lock registry.
Err(e) => { /// Exposed for the eviction integration test (`state_test.rs`).
eprintln!( #[cfg(test)]
"kei-cortex: token-tracker open {} failed: {e} — telemetry disabled", pub async fn user_lock_count(&self) -> usize {
path.display() self.inner.per_user_locks.lock().await.len()
);
None
}
} }
} }

View file

@ -0,0 +1,64 @@
//! Construction helpers extracted from `state.rs` to keep that file under
//! the Constructor-Pattern 200-LOC ceiling after the Wave 44d resource-cap
//! work added an LRU registry for per-user locks.
//!
//! Hosts:
//! - `default_invoker_factory` — builds the Anthropic memory-review
//! invoker fresh on every call so env-rotated API keys take effect.
//! - `default_review_system` — short, stable system slot for review.
//! - `open_token_tracker` — opens the token-event SQLite store and
//! gracefully degrades to `None` when the host has no telemetry dir.
use crate::agent::anthropic_memory_invoker::AnthropicMemoryInvoker;
use crate::agent::memory_review_task::Invoker;
use crate::state::InvokerFactory;
use kei_token_tracker::Store as TokenTracker;
use std::sync::Arc;
/// Default Anthropic-backed invoker factory. Each call rebuilds the
/// invoker so the API key is re-read fresh — same discipline as
/// `anthropic::open_stream` (no client caching). The system slot
/// uses the review prompt's persona stub; the actual review prompt
/// is appended by `run_review` as a trailing user message.
pub(crate) fn default_invoker_factory() -> InvokerFactory {
Arc::new(|| {
Arc::new(AnthropicMemoryInvoker::new(default_review_system())) as Arc<dyn Invoker>
})
}
/// System-slot text for memory-review calls. Kept short and stable
/// across reviews so the model response is dominated by the snapshot
/// + review prompt rather than persona drift.
fn default_review_system() -> String {
"You are a quiet observer reviewing a chat to surface memory-worthy facts."
.to_string()
}
/// Try to open the token-event store at the configured path. Returns
/// `None` when the parent directory does not exist or the open fails —
/// startup must NOT fail just because telemetry isn't ready (a fresh
/// host without `~/.keisei/` is normal). Errors are logged to stderr
/// once at startup so an operator notices, but the daemon keeps running.
pub(crate) fn open_token_tracker(
path: &std::path::Path,
) -> Option<Arc<std::sync::Mutex<TokenTracker>>> {
if let Some(parent) = path.parent() {
if !parent.exists() {
eprintln!(
"kei-cortex: token-tracker parent dir {:?} missing — skipping store open",
parent
);
return None;
}
}
match TokenTracker::open(path) {
Ok(s) => Some(Arc::new(std::sync::Mutex::new(s))),
Err(e) => {
eprintln!(
"kei-cortex: token-tracker open {} failed: {e} — telemetry disabled",
path.display()
);
None
}
}
}

View file

@ -2,7 +2,8 @@
//! //!
//! Constructor Pattern: extracted to a sibling so the parent stays //! Constructor Pattern: extracted to a sibling so the parent stays
//! ≤200 LOC after the Hermes P2.2.b additions (scheduler + invoker //! ≤200 LOC after the Hermes P2.2.b additions (scheduler + invoker
//! factory plumbing). //! factory plumbing) and Wave 44d resource-cap hardening (per-user
//! lock LRU eviction).
use super::*; use super::*;
use std::path::PathBuf; use std::path::PathBuf;
@ -33,19 +34,19 @@ impl Invoker for Counter {
} }
} }
#[test] #[tokio::test]
fn user_lock_is_stable_per_user() { async fn user_lock_is_stable_per_user() {
let state = AppState::new(dummy_config(), "tok".into()); let state = AppState::new(dummy_config(), "tok".into());
let a = state.user_lock("alice"); let a = state.user_lock("alice").await;
let b = state.user_lock("alice"); let b = state.user_lock("alice").await;
assert!(Arc::ptr_eq(&a, &b)); assert!(Arc::ptr_eq(&a, &b));
} }
#[test] #[tokio::test]
fn user_lock_differs_per_user() { async fn user_lock_differs_per_user() {
let state = AppState::new(dummy_config(), "tok".into()); let state = AppState::new(dummy_config(), "tok".into());
let a = state.user_lock("alice"); let a = state.user_lock("alice").await;
let b = state.user_lock("bob"); let b = state.user_lock("bob").await;
assert!(!Arc::ptr_eq(&a, &b)); assert!(!Arc::ptr_eq(&a, &b));
} }
@ -79,3 +80,43 @@ fn invoker_factory_yields_distinct_arcs() {
// factory does NOT memoise). // factory does NOT memoise).
assert!(!Arc::ptr_eq(&a, &b)); assert!(!Arc::ptr_eq(&a, &b));
} }
/// Resource-exhaustion guard: the registry MUST cap at
/// `PER_USER_LOCK_CAP` (1024). After inserting 2× cap distinct user_ids
/// the registry size stays ≤ cap, proving LRU eviction kicked in. Without
/// the cap an auth'd attacker with 1M unique user_ids would OOM the
/// daemon — this test pins that the bound holds.
#[tokio::test]
async fn user_lock_evicts_past_cap() {
let state = AppState::new(dummy_config(), "tok".into());
let cap = super::PER_USER_LOCK_CAP;
for i in 0..(cap * 2) {
let _ = state.user_lock(&format!("user-{i}")).await;
}
let count = state.user_lock_count().await;
assert!(
count <= cap,
"registry must stay ≤ {cap} after {} inserts; got {count}",
cap * 2
);
assert!(count > 0, "registry should retain the most recent inserts");
}
/// LRU recency: the most recent inserts MUST survive eviction. After
/// pushing 2× cap entries, the very last one we inserted should still
/// be in the registry (cheap pointer-equality check via `user_lock`
/// returning the same `Arc`).
#[tokio::test]
async fn user_lock_keeps_most_recent() {
let state = AppState::new(dummy_config(), "tok".into());
let cap = super::PER_USER_LOCK_CAP;
for i in 0..(cap * 2) {
let _ = state.user_lock(&format!("user-{i}")).await;
}
// The very last user inserted in the loop above must still be cached:
// re-fetching returns a clone of the same Arc.
let last_key = format!("user-{}", cap * 2 - 1);
let a = state.user_lock(&last_key).await;
let b = state.user_lock(&last_key).await;
assert!(Arc::ptr_eq(&a, &b));
}

View file

@ -12,6 +12,7 @@
//! whose crate has not yet been migrated to the `run-atom` protocol). //! whose crate has not yet been migrated to the `run-atom` protocol).
use crate::discover::{walk_atoms, AtomMeta}; use crate::discover::{walk_atoms, AtomMeta};
use crate::invoke_io::{capture_with_cap, Captured};
use crate::validate::validate_input; use crate::validate::validate_input;
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
@ -19,55 +20,13 @@ use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
/// Max bytes we read from subprocess stdout/stderr to guard against runaway output. pub use crate::invoke_error::InvokeError;
const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB
#[derive(Debug)] /// Max bytes we read from subprocess stdout/stderr to guard against
pub enum InvokeError { /// runaway output. Streamed reads enforce this DURING capture (not
AtomNotFound(String), /// post-hoc) — see `invoke_io.rs`. The constant is kept here as the
InputParse(String), /// public name documented in `lib.rs`.
InputInvalid(String), pub const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB
MissingInputSchema(String),
/// `crate_name` in atom YAML failed the `kei-*` allowlist check.
InvalidAtom(String),
/// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`.
BinaryNotFound { crate_name: String },
/// Subprocess exited non-zero — propagate the atom's own exit code.
AtomFailed { atom: String, code: i32, stderr: String },
/// IO / spawn failure (not a non-zero exit from the child).
SubprocessError(String),
/// Atom's stdout was not parseable as JSON.
OutputParse(String),
/// Legacy escape — atom not yet migrated to `run-atom` protocol.
NotImplemented { atom: String },
}
impl std::fmt::Display for InvokeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AtomNotFound(id) => write!(f, "no atom matching {id}"),
Self::InputParse(e) => write!(f, "input rejected: {e}"),
Self::InputInvalid(e) => write!(f, "input rejected: {e}"),
Self::MissingInputSchema(id) => write!(f, "atom `{id}` declares no input schema"),
Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"),
Self::BinaryNotFound { crate_name } => write!(
f,
"binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR"
),
Self::AtomFailed { atom, code, stderr } => {
write!(f, "atom `{atom}` exited {code}: {stderr}")
}
Self::SubprocessError(e) => write!(f, "subprocess: {e}"),
Self::OutputParse(e) => write!(f, "atom stdout not JSON: {e}"),
Self::NotImplemented { atom } => write!(
f,
"invoke not yet wired for this atom ({atom}); use the underlying CLI directly"
),
}
}
}
impl std::error::Error for InvokeError {}
/// Parsed output of an invoked atom. `result` is the raw JSON the atom wrote. /// Parsed output of an invoked atom. `result` is the raw JSON the atom wrote.
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -139,40 +98,43 @@ fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result<Output, InvokeError> {
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn() .spawn()
.map_err(|e| InvokeError::SubprocessError(format!("spawn {}: {e}", bin.display())))?; .map_err(|e| InvokeError::SubprocessError(format!("spawn {}: {e}", bin.display())))?;
write_stdin(&mut child, input_json)?;
let captured = capture_with_cap(child)
.map_err(|e| InvokeError::SubprocessError(format!("wait: {e}")))?;
handle_captured(meta, captured)
}
/// Write the atom's input JSON to the child's stdin, dropping the handle
/// so the writer side closes (otherwise the child would block on EOF).
fn write_stdin(child: &mut std::process::Child, input_json: &str) -> Result<(), InvokeError> {
if let Some(mut stdin) = child.stdin.take() { if let Some(mut stdin) = child.stdin.take() {
stdin stdin
.write_all(input_json.as_bytes()) .write_all(input_json.as_bytes())
.map_err(|e| InvokeError::SubprocessError(format!("write stdin: {e}")))?; .map_err(|e| InvokeError::SubprocessError(format!("write stdin: {e}")))?;
} }
let out = child Ok(())
.wait_with_output()
.map_err(|e| InvokeError::SubprocessError(format!("wait: {e}")))?;
handle_subprocess_output(meta, out)
} }
fn cap_bytes(data: Vec<u8>, label: &str) -> Vec<u8> { /// Map a `Captured` (potentially-truncated) child result into our typed
if data.len() > OUTPUT_CAP { /// `Output` / `InvokeError`. Truncation is surfaced in `SubprocessError`
let mut v = data; /// when the child was killed for exceeding the cap; the parent decides
v.truncate(OUTPUT_CAP); /// what exit code to return.
eprintln!("[kei-runtime] {label} truncated at {OUTPUT_CAP} bytes"); fn handle_captured(meta: &AtomMeta, c: Captured) -> Result<Output, InvokeError> {
v if c.truncated {
} else { return Err(InvokeError::SubprocessError(format!(
data "atom `{}` killed: stdout/stderr exceeded {OUTPUT_CAP} byte cap",
meta.full_id
)));
} }
} if c.status_code != 0 {
let stderr = String::from_utf8_lossy(&c.stderr).trim().to_string();
fn handle_subprocess_output( return Err(InvokeError::AtomFailed {
meta: &AtomMeta, atom: meta.full_id.clone(),
mut out: std::process::Output, code: c.status_code,
) -> Result<Output, InvokeError> { stderr,
out.stdout = cap_bytes(out.stdout, "stdout"); });
out.stderr = cap_bytes(out.stderr, "stderr");
let code = out.status.code().unwrap_or(-1);
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string();
return Err(InvokeError::AtomFailed { atom: meta.full_id.clone(), code, stderr });
} }
let stdout = String::from_utf8_lossy(&out.stdout); let stdout = String::from_utf8_lossy(&c.stdout);
let result: Value = serde_json::from_str(stdout.trim()) let result: Value = serde_json::from_str(stdout.trim())
.map_err(|e| InvokeError::OutputParse(format!("{e}; stdout was: {stdout}")))?; .map_err(|e| InvokeError::OutputParse(format!("{e}; stdout was: {stdout}")))?;
Ok(Output { atom: meta.full_id.clone(), result }) Ok(Output { atom: meta.full_id.clone(), result })

View file

@ -0,0 +1,54 @@
//! Typed errors for the atom-invocation runtime.
//!
//! Constructor Pattern: extracted from `invoke.rs` so the runtime
//! parent file stays under 200 LOC after Wave 44d added bounded-read
//! capture + truncation handling.
#[derive(Debug)]
pub enum InvokeError {
AtomNotFound(String),
InputParse(String),
InputInvalid(String),
MissingInputSchema(String),
/// `crate_name` in atom YAML failed the `kei-*` allowlist check.
InvalidAtom(String),
/// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`.
BinaryNotFound { crate_name: String },
/// Subprocess exited non-zero — propagate the atom's own exit code.
AtomFailed { atom: String, code: i32, stderr: String },
/// IO / spawn failure (not a non-zero exit from the child).
SubprocessError(String),
/// Atom's stdout was not parseable as JSON.
OutputParse(String),
/// Legacy escape — atom not yet migrated to `run-atom` protocol.
NotImplemented { atom: String },
}
impl std::fmt::Display for InvokeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AtomNotFound(id) => write!(f, "no atom matching {id}"),
Self::InputParse(e) => write!(f, "input rejected: {e}"),
Self::InputInvalid(e) => write!(f, "input rejected: {e}"),
Self::MissingInputSchema(id) => {
write!(f, "atom `{id}` declares no input schema")
}
Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"),
Self::BinaryNotFound { crate_name } => write!(
f,
"binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR"
),
Self::AtomFailed { atom, code, stderr } => {
write!(f, "atom `{atom}` exited {code}: {stderr}")
}
Self::SubprocessError(e) => write!(f, "subprocess: {e}"),
Self::OutputParse(e) => write!(f, "atom stdout not JSON: {e}"),
Self::NotImplemented { atom } => write!(
f,
"invoke not yet wired for this atom ({atom}); use the underlying CLI directly"
),
}
}
}
impl std::error::Error for InvokeError {}

View file

@ -0,0 +1,146 @@
//! Bounded-read child-process IO for `invoke.rs`.
//!
//! Wave 44d resource-cap hardening: the previous `wait_with_output()`
//! buffered ALL stdout/stderr before any size check ran — a malicious
//! atom writing 10 GiB of zeros would OOM the runtime before truncation.
//! This module replaces that with size-tracked stream readers that KILL
//! the child the moment a cap is exceeded — the kill is issued from
//! INSIDE the reader thread so an unbounded writer cannot deadlock the
//! parent (post-hoc kill would never happen because the reader would
//! never return on infinite stdout).
//!
//! Constructor Pattern: extracted as a sibling module so `invoke.rs`
//! stays under 200 LOC and per-function under 30 LOC.
use std::io::Read;
use std::process::{Child, ChildStderr, ChildStdout};
use std::sync::{Arc, Mutex};
use std::thread;
/// Hard cap on stdout/stderr each. Mirrors the public `OUTPUT_CAP`
/// constant in `invoke.rs` (16 MiB). Kept as a module-private mirror
/// so this file can be reasoned about in isolation.
const STREAM_CAP: usize = 16 * 1024 * 1024;
/// Per-read chunk size. 8 KiB is the typical pipe buffer granularity
/// on Linux/macOS; smaller would inflate syscall count, larger would
/// risk overshooting the cap by up to one chunk.
const CHUNK: usize = 8 * 1024;
/// Captured child output. `truncated` is true when EITHER stream hit
/// its cap; the caller is expected to surface the truncation in any
/// error message it returns to the user.
pub(crate) struct Captured {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub status_code: i32,
pub truncated: bool,
}
/// Shared kill-handle: the reader thread that trips the cap takes the
/// `Mutex`, calls `kill()`, drops the lock. Wrapped so both readers
/// race-safely — only one wins, the other sees the child already gone.
type KillHandle = Arc<Mutex<Option<Child>>>;
/// Read both streams concurrently with size caps. If either reader
/// trips its cap, that reader kills the child IMMEDIATELY (from inside
/// the reader thread) so an unbounded writer cannot deadlock us. The
/// other reader then sees the pipe close and returns. Finally we reap.
pub(crate) fn capture_with_cap(mut child: Child) -> std::io::Result<Captured> {
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let kill: KillHandle = Arc::new(Mutex::new(Some(child)));
let stdout_handle = spawn_reader_stdout(stdout, kill.clone());
let stderr_handle = spawn_reader_stderr(stderr, kill.clone());
let (out_buf, out_trunc) = stdout_handle.join().expect("stdout thread");
let (err_buf, err_trunc) = stderr_handle.join().expect("stderr thread");
let status_code = reap_child(&kill);
Ok(Captured {
stdout: out_buf,
stderr: err_buf,
status_code,
truncated: out_trunc || err_trunc,
})
}
/// Reap the child (waiting on it) and return its exit code. If a reader
/// already killed the child the lock will hold `None` — fall through to
/// -1 (signaled). Otherwise we still call `wait()` to avoid a zombie.
fn reap_child(kill: &KillHandle) -> i32 {
let mut guard = kill.lock().expect("kill mutex");
if let Some(mut c) = guard.take() {
c.wait().ok().and_then(|s| s.code()).unwrap_or(-1)
} else {
-1
}
}
/// Spawn the stdout reader thread. Returns `(buffer, truncated)`.
fn spawn_reader_stdout(
stream: ChildStdout,
kill: KillHandle,
) -> thread::JoinHandle<(Vec<u8>, bool)> {
thread::spawn(move || read_capped(stream, kill))
}
/// Spawn the stderr reader thread. Returns `(buffer, truncated)`.
fn spawn_reader_stderr(
stream: ChildStderr,
kill: KillHandle,
) -> thread::JoinHandle<(Vec<u8>, bool)> {
thread::spawn(move || read_capped(stream, kill))
}
/// Read until EOF or cap exceeded. On cap exceedance, kill the child
/// IMMEDIATELY via the shared handle then keep draining the pipe so
/// the OS-level pipe buffer empties cleanly (would-be writes by the
/// already-killed child error out fast, EOF arrives quickly).
fn read_capped<R: Read>(mut stream: R, kill: KillHandle) -> (Vec<u8>, bool) {
let mut buf = Vec::with_capacity(CHUNK);
let mut chunk = [0u8; CHUNK];
let mut truncated = false;
loop {
match stream.read(&mut chunk) {
Ok(0) => break,
Ok(n) => append_or_truncate(&mut buf, &chunk[..n], &mut truncated, &kill),
Err(_) => break,
}
}
(buf, truncated)
}
/// Append-or-mark logic, extracted so `read_capped` stays under the 30-LOC
/// ceiling. Side effect: on the cap-crossing call, sends a kill via the
/// shared handle before returning.
fn append_or_truncate(
buf: &mut Vec<u8>,
incoming: &[u8],
truncated: &mut bool,
kill: &KillHandle,
) {
if *truncated {
return; // already past cap; drain silently so writer doesn't block
}
if buf.len() + incoming.len() > STREAM_CAP {
let take = STREAM_CAP.saturating_sub(buf.len());
buf.extend_from_slice(&incoming[..take]);
*truncated = true;
kill_child(kill);
} else {
buf.extend_from_slice(incoming);
}
}
/// Kill the child via the shared handle. Best-effort: if another
/// reader has already killed + reaped, the lock holds `None` and this
/// is a no-op.
fn kill_child(kill: &KillHandle) {
if let Ok(mut guard) = kill.lock() {
if let Some(c) = guard.as_mut() {
let _ = c.kill();
eprintln!(
"[kei-runtime] child killed: stdout/stderr exceeded {STREAM_CAP} byte cap"
);
}
}
}

View file

@ -10,5 +10,7 @@
pub mod discover; pub mod discover;
pub mod invoke; pub mod invoke;
pub(crate) mod invoke_error;
pub(crate) mod invoke_io;
pub mod lint; pub mod lint;
pub mod validate; pub mod validate;

View file

@ -0,0 +1,103 @@
//! Integration test — runaway atom that floods stdout MUST be killed
//! after the 16 MiB cap, not buffered to OOM.
//!
//! Wave 44d resource-cap: replaces the post-hoc `cap_bytes` truncation
//! with streamed reads in `invoke_io.rs`. This test pins the new
//! behaviour: a fake atom binary that emits 100 MiB of zeros must
//! exit non-zero (killed by parent) rather than complete normally
//! with 100 MiB buffered.
//!
//! Strategy:
//! 1. Build a tiny shell-script "atom" that ignores stdin and writes
//! 100 MiB of zeros to stdout. We can't use `dd` directly because
//! the runtime's allowlist enforces `kei-*` crate names — so we
//! stage a script named `kei-flood` in a temp bin dir.
//! 2. Stage atom YAML naming `kei-flood::pour`.
//! 3. Invoke via the runtime CLI; expect non-zero exit + a stderr
//! message naming the cap.
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Command;
const BIN: &str = env!("CARGO_BIN_EXE_kei-runtime");
fn write_atom_md(root: &Path, crate_name: &str, verb: &str) {
let atoms = root.join(crate_name).join("atoms");
let schemas = atoms.join("schemas");
fs::create_dir_all(&schemas).unwrap();
let in_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#;
let out_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#;
fs::write(schemas.join(format!("{verb}-input.json")), in_schema).unwrap();
fs::write(schemas.join(format!("{verb}-output.json")), out_schema).unwrap();
let md = format!(
"---\natom: {crate_name}::{verb}\nkind: command\nversion: \"0.1.0\"\n\
input:\n schema: schemas/{verb}-input.json\n\
output:\n schema: schemas/{verb}-output.json\n\
side_effects: []\nidempotent: true\nstability: stable\n---\n"
);
fs::write(atoms.join(format!("{verb}.md")), md).unwrap();
}
/// Stage a `kei-flood` shell-script in `bin_dir`. When invoked with
/// `run-atom pour` it writes a continuous stream of zeros to stdout
/// (well past the 16 MiB cap) using a bash builtin loop — no
/// dependency on external `dd` because PATH may be locked down.
/// The parent runtime should kill it well before it finishes.
fn stage_flood_binary(bin_dir: &Path) -> std::path::PathBuf {
fs::create_dir_all(bin_dir).unwrap();
let script = bin_dir.join("kei-flood");
// Pipe /dev/zero straight to stdout via /bin/cat, which is
// present at /bin/cat on macOS and Linux. The script will run
// unbounded; the parent runtime is expected to kill it after
// the 16 MiB cap. SIGPIPE on cat (when the parent stops reading
// and the pipe closes) is benign — we just want enough volume
// to provably exceed the cap.
let body = "#!/bin/sh\nexec /bin/cat /dev/zero\n";
fs::write(&script, body).unwrap();
let mut perms = fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script, perms).unwrap();
script
}
#[test]
fn invoke_kills_runaway_atom() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path().join("root");
let bin = tmp.path().join("bin");
write_atom_md(&root, "kei-flood", "pour");
let _script = stage_flood_binary(&bin);
let out = Command::new(BIN)
.env("KEI_RUNTIME_BIN_DIR", &bin)
.env("PATH", &bin)
.arg("invoke")
.arg("kei-flood::pour")
.arg("--input")
.arg("{}")
.arg("--root")
.arg(&root)
.output()
.expect("spawn kei-runtime");
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
let stdout_len = out.stdout.len();
// The runtime printed at most a small JSON envelope OR nothing
// (process killed). 16 MiB cap + a sliver of envelope JSON → well
// under 18 MiB. If the cap had failed, stdout would be ~100 MiB.
assert!(
stdout_len < 18 * 1024 * 1024,
"expected stdout < 18 MiB; got {stdout_len} (cap not enforced!)"
);
// We expect a non-zero exit because the child was killed.
assert_ne!(
out.status.code(),
Some(0),
"expected non-zero exit on runaway; stderr: {stderr}"
);
// Stderr should mention the cap so the operator can diagnose.
assert!(
stderr.contains("cap") || stderr.contains("subprocess"),
"expected 'cap' / 'subprocess' in stderr: {stderr}"
);
}