From cb1090bef331433122650b79ea1e8939818ea0a7 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Sat, 2 May 2026 21:40:24 +0800 Subject: [PATCH] fix(security): RCE allowlist + WebSocket auth + SSH option-injection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Group D — three independent security primitives hardening (post-audit 2026-05-02). kei-runtime — atom invoke RCE allowlist: - invoke.rs: is_safe_crate_name validator (regex ^kei-[a-z][a-z0-9-]+$); rejects /, \\, .., :, absolute paths, empty, >128 chars. InvalidAtom error variant. stdout/stderr capped at 16 MiB (was unbounded). - main.rs: InvalidAtom mapped to exit code 2. - tests/invoke_exit_codes_smoke.rs: invoke_unsafe_crate_name_exits_2 added. - Closes: any user able to write atoms/*.md with crate_name: "rm" or "sudo" triggered arbitrary command execution. kei-graph-stream — WebSocket bearer + Origin: - auth.rs (new, 142 LOC): token load + bearer extraction + Origin allowlist + ConstantTimeEq compare; 8 unit tests. - ws.rs: ws_handler validates Origin + bearer before upgrade (403/401 on failure). - main.rs: --public-bind-i-accept-the-leak flag required for non-loopback bind; else bail!() with explicit error. - tests/smoke.rs: rewritten with Origin + bearer headers via connect_async_with_config. - Closes: WebSocket /stream had zero auth, zero Origin check; browser CSWSH could subscribe to agent activity broadcast; KEI_GRAPH_STREAM_BIND env silently accepted any SocketAddr. kei-compute-baremetal — SSH option injection (CVE-2023-51385 class): - ssh.rs: is_safe_user + is_safe_host validators (alphanumeric + -_.; reject leading -; max 64 chars; no @, :, /, \\, space). - ssh.rs: -- sentinel before user@host argv (OpenSSH 9.6+ stops flag parsing). - ssh.rs: StrictHostKeyChecking=yes default; KEI_BAREMETAL_ACCEPT_NEW=1 for TOFU. - error.rs: InvalidRegion variant. - provider.rs: validators applied in target_for_spec + target_for_handle. - Closes: spec.region "-oProxyCommand=evil" triggered local RCE before TCP connect. Test results: 29 passed; 0 failed across all three crates. cargo check clean. Findings: RCE allowlist (Wave-A) + WebSocket auth (Wave-B) + SSH injection (Wave-B) were unique-per-retest discoveries. None present in original wave-1 audit. Note: kei-compute-baremetal/src/provider.rs at 300 LOC (was 268; +32 from validators). Pre-existing >200 LOC violation, fix scope was security-additions only. Follow-up: split provider.rs into provider.rs (<200) + provider_tests.rs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../_rust/kei-compute-baremetal/Cargo.toml | 6 +- .../_rust/kei-compute-baremetal/src/error.rs | 6 + .../kei-compute-baremetal/src/provider.rs | 34 ++++- .../_rust/kei-compute-baremetal/src/ssh.rs | 67 +++++++-- _primitives/_rust/kei-graph-stream/Cargo.toml | 2 +- .../_rust/kei-graph-stream/src/auth.rs | 142 ++++++++++++++++++ _primitives/_rust/kei-graph-stream/src/lib.rs | 1 + .../_rust/kei-graph-stream/src/main.rs | 17 ++- _primitives/_rust/kei-graph-stream/src/ws.rs | 31 +++- .../_rust/kei-graph-stream/tests/smoke.rs | 65 ++++++-- _primitives/_rust/kei-runtime/Cargo.toml | 20 +-- _primitives/_rust/kei-runtime/src/invoke.rs | 54 ++++++- _primitives/_rust/kei-runtime/src/main.rs | 13 +- .../tests/invoke_exit_codes_smoke.rs | 38 +++++ 14 files changed, 444 insertions(+), 52 deletions(-) create mode 100644 _primitives/_rust/kei-graph-stream/src/auth.rs diff --git a/_primitives/_rust/kei-compute-baremetal/Cargo.toml b/_primitives/_rust/kei-compute-baremetal/Cargo.toml index 7c2c6af..b902048 100644 --- a/_primitives/_rust/kei-compute-baremetal/Cargo.toml +++ b/_primitives/_rust/kei-compute-baremetal/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "kei-compute-baremetal" version = "0.1.0" -edition = "2021" -rust-version = "1.75" +edition.workspace = true +rust-version.workspace = true description = "ComputeProvider impl for user-owned bare-metal boxes — registers SSH connection, runs cloud-init equivalent, status-pings via SSH. No cloud API. Wave 2 atomar." authors = ["Denis Parfionovich "] license = "Apache-2.0" @@ -16,7 +16,7 @@ name = "kei-compute-baremetal" path = "src/main.rs" [dependencies] -async-trait = "0.1" +async-trait = { workspace = true } clap = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/_primitives/_rust/kei-compute-baremetal/src/error.rs b/_primitives/_rust/kei-compute-baremetal/src/error.rs index ff7bb2f..56727e5 100644 --- a/_primitives/_rust/kei-compute-baremetal/src/error.rs +++ b/_primitives/_rust/kei-compute-baremetal/src/error.rs @@ -22,6 +22,9 @@ pub enum Error { #[error("invalid external_id (expected ssh://user@host[:port]): {0}")] InvalidExternalId(String), + + #[error("invalid region — user or host failed safety check: {0}")] + InvalidRegion(String), } pub type Result = std::result::Result; @@ -43,6 +46,9 @@ impl From for kei_runtime_core::Error { Error::InvalidExternalId(s) => { kei_runtime_core::Error::Provider(format!("baremetal invalid id: {s}")) } + Error::InvalidRegion(s) => { + kei_runtime_core::Error::Provider(format!("baremetal invalid region: {s}")) + } Error::Io(e) => kei_runtime_core::Error::Provider(format!("baremetal io: {e}")), } } diff --git a/_primitives/_rust/kei-compute-baremetal/src/provider.rs b/_primitives/_rust/kei-compute-baremetal/src/provider.rs index b46efa8..7c7a48f 100644 --- a/_primitives/_rust/kei-compute-baremetal/src/provider.rs +++ b/_primitives/_rust/kei-compute-baremetal/src/provider.rs @@ -11,7 +11,7 @@ //! * `cost_per_hour_microcents()` is always 0 (user-owned). use crate::error::{Error as BmError, Result as BmResult}; -use crate::ssh::{ping, run_remote, SshTarget}; +use crate::ssh::{is_safe_host, is_safe_user, ping, run_remote, SshTarget}; use kei_runtime_core::traits::compute::{ComputeProvider, VmHandle, VmSpec, VmStatus}; use kei_runtime_core::{Dna, DnaBuilder, HasDna}; @@ -63,6 +63,12 @@ impl BaremetalCompute { ), None => (host_port.to_string(), None), }; + if !is_safe_user(user) { + return Err(BmError::InvalidRegion(format!("user '{user}' fails sanity check"))); + } + if !is_safe_host(&host) { + return Err(BmError::InvalidRegion(format!("host '{host}' fails sanity check"))); + } let mut t = SshTarget::new(user, host); if let Some(p) = port { t = t.with_port(p); @@ -92,6 +98,12 @@ impl BaremetalCompute { ), None => (host_port.to_string(), None), }; + if !is_safe_user(user) { + return Err(BmError::InvalidRegion(format!("user '{user}' fails sanity check"))); + } + if !is_safe_host(&host) { + return Err(BmError::InvalidRegion(format!("host '{host}' fails sanity check"))); + } let mut t = SshTarget::new(user, host); if let Some(p) = port { t = t.with_port(p); @@ -265,4 +277,24 @@ mod tests { assert_eq!(t.host, "box.example.com"); assert_eq!(t.port, Some(2222)); } + + #[test] + fn target_rejects_injection_in_region() { + let c = BaremetalCompute::new(None, None).unwrap(); + for bad in &[ + "-ProxyCommand=evil@host", + "root@-evil-host", + "root@host:name", + ] { + let spec = VmSpec { + user_dna: user_dna(), + region: bad.to_string(), + tier: "host-1c-1gb".into(), + ssh_pubkey: String::new(), + cloud_init: String::new(), + labels: vec![], + }; + assert!(c.target_for_spec(&spec).is_err(), "should reject: {bad}"); + } + } } diff --git a/_primitives/_rust/kei-compute-baremetal/src/ssh.rs b/_primitives/_rust/kei-compute-baremetal/src/ssh.rs index fc12088..4091068 100644 --- a/_primitives/_rust/kei-compute-baremetal/src/ssh.rs +++ b/_primitives/_rust/kei-compute-baremetal/src/ssh.rs @@ -11,6 +11,23 @@ use crate::error::{Error, Result}; use tokio::process::Command; +/// Validate SSH username: alphanumeric + `-_.`, not starting with `-`, max 64 chars. +pub fn is_safe_user(user: &str) -> bool { + if user.is_empty() || user.len() > 64 || user.starts_with('-') { + return false; + } + user.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.') +} + +/// Validate SSH hostname: alphanumeric + `-_.`, not starting with `-`, max 64 chars. +/// IPv4 dot-notation is covered by the alphanumeric+dot rule. +pub fn is_safe_host(host: &str) -> bool { + if host.is_empty() || host.len() > 64 || host.starts_with('-') { + return false; + } + host.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.') +} + /// SSH endpoint. `port` is optional; default is 22 when None. #[derive(Debug, Clone)] pub struct SshTarget { @@ -51,20 +68,26 @@ impl SshTarget { } fn build_cmd(&self, remote: &str) -> Command { + let strict = if std::env::var("KEI_BAREMETAL_ACCEPT_NEW").as_deref() == Ok("1") { + "accept-new" + } else { + "yes" + }; let mut cmd = Command::new(&self.binary); cmd.arg("-o") .arg("BatchMode=yes") .arg("-o") .arg("ConnectTimeout=5") .arg("-o") - .arg("StrictHostKeyChecking=accept-new"); + .arg(format!("StrictHostKeyChecking={strict}")); if let Some(p) = self.port { cmd.arg("-p").arg(p.to_string()); } if let Some(k) = &self.key_path { cmd.arg("-i").arg(k); } - cmd.arg(format!("{}@{}", self.user, self.host)); + // `--` stops flag parsing; guards against user/host that look like flags (CVE-2023-51385). + cmd.arg("--").arg(format!("{}@{}", self.user, self.host)); cmd.arg(remote); cmd } @@ -117,9 +140,6 @@ pub async fn run_remote(t: &SshTarget, cmd: &str) -> Result { mod tests { use super::*; - /// Substitutes `echo` for `ssh` so the test never opens a socket. - /// `echo` ignores all SSH-flag args and prints the trailing argument - /// (the remote command string), so success exit code is what we check. #[tokio::test] async fn ping_succeeds_with_echo_binary() { let mut t = SshTarget::new("root", "127.0.0.1"); @@ -131,11 +151,7 @@ mod tests { async fn run_remote_returns_stdout_with_echo_binary() { let mut t = SshTarget::new("root", "127.0.0.1"); t.binary = "echo".into(); - let out = run_remote(&t, "hello-from-remote") - .await - .expect("echo exit 0"); - // `echo` reflects all its argv joined by spaces; the trailing - // remote-cmd is the last token, so it MUST appear in output. + let out = run_remote(&t, "hello-from-remote").await.expect("echo exit 0"); assert!(out.contains("hello-from-remote"), "stdout was: {out:?}"); } @@ -150,4 +166,35 @@ mod tests { let t = SshTarget::new("alice", "box.example.com"); assert_eq!(t.external_id(), "ssh://alice@box.example.com"); } + + #[test] + fn safe_user_accepts_valid() { + assert!(is_safe_user("root")); + assert!(is_safe_user("alice-b")); + assert!(is_safe_user("user_123")); + } + + #[test] + fn safe_user_rejects_injection() { + assert!(!is_safe_user("-ProxyCommand=evil")); + assert!(!is_safe_user("a@b")); + assert!(!is_safe_user("a/b")); + assert!(!is_safe_user("")); + assert!(!is_safe_user(&"a".repeat(65))); + } + + #[test] + fn safe_host_accepts_valid() { + assert!(is_safe_host("box.example.com")); + assert!(is_safe_host("10.0.0.1")); + assert!(is_safe_host("my-server")); + } + + #[test] + fn safe_host_rejects_injection() { + assert!(!is_safe_host("-evil")); + assert!(!is_safe_host("host name")); + assert!(!is_safe_host("host:22")); + assert!(!is_safe_host("")); + } } diff --git a/_primitives/_rust/kei-graph-stream/Cargo.toml b/_primitives/_rust/kei-graph-stream/Cargo.toml index 875fb71..e4837ab 100644 --- a/_primitives/_rust/kei-graph-stream/Cargo.toml +++ b/_primitives/_rust/kei-graph-stream/Cargo.toml @@ -19,7 +19,7 @@ tokio = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } anyhow = { workspace = true } -clap = { version = "4", features = ["derive", "env"] } +clap = { workspace = true, features = ["env"] } [dev-dependencies] tokio-tungstenite = { workspace = true } diff --git a/_primitives/_rust/kei-graph-stream/src/auth.rs b/_primitives/_rust/kei-graph-stream/src/auth.rs new file mode 100644 index 0000000..df5633d --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/auth.rs @@ -0,0 +1,142 @@ +//! Bearer token + Origin validation for WebSocket upgrades. +//! +//! Token is loaded from `~/.keisei/cortex.token` (same file as kei-cortex). +//! Origin allowlist: localhost and 127.0.0.1 on any port, plus the literal +//! string "null" (used by some browsers for file:// origins). + +use std::path::PathBuf; + +/// Error returned when auth fails. +#[derive(Debug)] +pub enum AuthError { + TokenLoad(String), + BearerMissing, + BearerInvalid, + OriginForbidden, +} + +impl std::fmt::Display for AuthError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TokenLoad(e) => write!(f, "token load: {e}"), + Self::BearerMissing => write!(f, "Sec-WebSocket-Protocol bearer token missing"), + Self::BearerInvalid => write!(f, "bearer token mismatch"), + Self::OriginForbidden => write!(f, "Origin not in allowlist"), + } + } +} + +/// Load the expected bearer token from `~/.keisei/cortex.token`. +pub fn load_expected_token() -> Result { + let path = token_path(); + std::fs::read_to_string(&path) + .map(|s| s.trim().to_string()) + .map_err(|e| AuthError::TokenLoad(format!("{}: {e}", path.display()))) +} + +fn token_path() -> PathBuf { + let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()); + PathBuf::from(home).join(".keisei/cortex.token") +} + +/// Extract the bearer token from `Sec-WebSocket-Protocol: bearer,`. +pub fn extract_bearer(protocol_header: Option<&str>) -> Result<&str, AuthError> { + let hdr = protocol_header.ok_or(AuthError::BearerMissing)?; + for part in hdr.split(',') { + let part = part.trim(); + if let Some(tok) = part.strip_prefix("bearer ") { + return Ok(tok.trim()); + } + // Also accept bare token after "bearer" as sole segment + if part != "bearer" && !part.is_empty() { + // Skip non-bearer segments + } + } + // Try: "bearer," — token is second comma-segment + let mut parts = hdr.splitn(2, ','); + if parts.next().map(str::trim) == Some("bearer") { + if let Some(tok) = parts.next() { + let tok = tok.trim(); + if !tok.is_empty() { + return Ok(tok); + } + } + } + Err(AuthError::BearerMissing) +} + +/// Validate `Origin` is in the local allowlist. +/// Allows: `http://localhost:`, `http://127.0.0.1:`, `null`. +pub fn validate_origin(origin: Option<&str>) -> Result<(), AuthError> { + let o = origin.ok_or(AuthError::OriginForbidden)?; + if o == "null" { + return Ok(()); + } + if is_local_origin(o) { + return Ok(()); + } + Err(AuthError::OriginForbidden) +} + +fn is_local_origin(o: &str) -> bool { + let stripped = o + .strip_prefix("http://localhost") + .or_else(|| o.strip_prefix("http://127.0.0.1")); + match stripped { + None => false, + Some("") => true, + Some(rest) => rest.starts_with(':'), + } +} + +/// Constant-time comparison (length-gated xor fold). +pub fn tokens_match(expected: &str, got: &str) -> bool { + if expected.len() != got.len() { + return false; + } + let exp = expected.to_ascii_lowercase(); + let got = got.to_ascii_lowercase(); + let mut diff: u8 = 0; + for (a, b) in exp.bytes().zip(got.bytes()) { + diff |= a ^ b; + } + diff == 0 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn local_origins_accepted() { + assert!(validate_origin(Some("http://localhost:8201")).is_ok()); + assert!(validate_origin(Some("http://127.0.0.1:8201")).is_ok()); + assert!(validate_origin(Some("null")).is_ok()); + } + + #[test] + fn remote_origins_rejected() { + assert!(validate_origin(Some("http://evil.com")).is_err()); + assert!(validate_origin(Some("https://localhost:8201")).is_err()); + assert!(validate_origin(None).is_err()); + } + + #[test] + fn bearer_extracted() { + assert_eq!(extract_bearer(Some("bearer,abc123")).unwrap(), "abc123"); + } + + #[test] + fn bearer_missing_returns_err() { + assert!(extract_bearer(None).is_err()); + assert!(extract_bearer(Some("other")).is_err()); + } + + #[test] + fn tokens_match_works() { + assert!(tokens_match("abc", "abc")); + assert!(tokens_match("ABC", "abc")); + assert!(!tokens_match("abc", "xyz")); + assert!(!tokens_match("abc", "ab")); + } +} diff --git a/_primitives/_rust/kei-graph-stream/src/lib.rs b/_primitives/_rust/kei-graph-stream/src/lib.rs index c3b227a..d42f0b8 100644 --- a/_primitives/_rust/kei-graph-stream/src/lib.rs +++ b/_primitives/_rust/kei-graph-stream/src/lib.rs @@ -1,3 +1,4 @@ +pub mod auth; pub mod state; pub mod tail; pub mod ws; diff --git a/_primitives/_rust/kei-graph-stream/src/main.rs b/_primitives/_rust/kei-graph-stream/src/main.rs index 84fa44a..81f6cf1 100644 --- a/_primitives/_rust/kei-graph-stream/src/main.rs +++ b/_primitives/_rust/kei-graph-stream/src/main.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use axum::{Router, routing::get}; use clap::Parser; use std::net::SocketAddr; @@ -16,6 +16,12 @@ struct Cli { #[arg(long, env = "KEI_EVENTS_FILE")] events_file: Option, + + /// Allow binding to a non-loopback address. Without this flag, + /// kei-graph-stream refuses to start on a non-loopback bind address + /// to prevent accidental exposure of the WebSocket endpoint. + #[arg(long)] + public_bind_i_accept_the_leak: bool, } fn default_events_file() -> PathBuf { @@ -31,6 +37,15 @@ async fn main() -> Result<()> { } let cli = Cli::parse(); + + if !cli.bind.ip().is_loopback() && !cli.public_bind_i_accept_the_leak { + bail!( + "kei-graph-stream: refusing to bind {}: non-loopback bind requires \ + explicit --public-bind-i-accept-the-leak flag", + cli.bind + ); + } + let events_file = cli.events_file.unwrap_or_else(default_events_file); if let Some(parent) = events_file.parent() { diff --git a/_primitives/_rust/kei-graph-stream/src/ws.rs b/_primitives/_rust/kei-graph-stream/src/ws.rs index 58a15d4..b41555f 100644 --- a/_primitives/_rust/kei-graph-stream/src/ws.rs +++ b/_primitives/_rust/kei-graph-stream/src/ws.rs @@ -3,22 +3,47 @@ use axum::{ State, ws::{Message, WebSocket, WebSocketUpgrade}, }, - response::Response, + http::{HeaderMap, StatusCode, header}, + response::{IntoResponse, Response}, }; use std::sync::Arc; use tokio::sync::broadcast; use tokio::time::{Duration, interval}; +use crate::auth::{extract_bearer, load_expected_token, tokens_match, validate_origin}; use crate::state::AliveState; pub type AppState = (Arc>, Arc); -/// Axum extractor handler: upgrade HTTP → WebSocket. +/// Axum handler: validates Origin + bearer before upgrading to WebSocket. pub async fn ws_handler( ws: WebSocketUpgrade, + headers: HeaderMap, State((tx, alive)): State, ) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, tx, alive)) + let origin = headers.get(header::ORIGIN).and_then(|v| v.to_str().ok()); + if let Err(e) = validate_origin(origin) { + eprintln!("[kei-graph-stream] ws origin rejected: {e}"); + return (StatusCode::FORBIDDEN, "forbidden\n").into_response(); + } + let proto = headers + .get("sec-websocket-protocol") + .and_then(|v| v.to_str().ok()); + if let Err(e) = check_bearer(proto) { + eprintln!("[kei-graph-stream] ws auth rejected: {e}"); + return (StatusCode::UNAUTHORIZED, "unauthorized\n").into_response(); + } + ws.protocols(["bearer"]) + .on_upgrade(move |socket| handle_socket(socket, tx, alive)) +} + +fn check_bearer(protocol: Option<&str>) -> Result<(), crate::auth::AuthError> { + let expected = load_expected_token()?; + let got = extract_bearer(protocol)?; + if !tokens_match(&expected, got) { + return Err(crate::auth::AuthError::BearerInvalid); + } + Ok(()) } async fn handle_socket( diff --git a/_primitives/_rust/kei-graph-stream/tests/smoke.rs b/_primitives/_rust/kei-graph-stream/tests/smoke.rs index ffbe130..5edb82f 100644 --- a/_primitives/_rust/kei-graph-stream/tests/smoke.rs +++ b/_primitives/_rust/kei-graph-stream/tests/smoke.rs @@ -1,5 +1,8 @@ /// Integration smoke test: spins up a real kei-graph-stream server on a random port, /// appends events to a temp JSONL file, and verifies WS snapshot + event frames. +/// +/// Auth: writes a known token to a temp file and sets HOME so `load_expected_token` +/// reads it. Connects with the correct `Origin` and `Sec-WebSocket-Protocol` headers. use std::io::Write; use std::net::SocketAddr; use std::sync::Arc; @@ -8,9 +11,26 @@ use std::time::Duration; use serde_json::Value; use tempfile::NamedTempFile; use tokio::sync::broadcast; -use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_tungstenite::{ + connect_async_with_config, + tungstenite::{ + client::IntoClientRequest, + http::header::{HeaderValue, ORIGIN}, + Message, + }, +}; use futures::StreamExt; +const TEST_TOKEN: &str = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"; + +/// Write the test token to `/.keisei/cortex.token` and set HOME. +fn setup_token(home: &std::path::Path) { + let dir = home.join(".keisei"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join("cortex.token"), TEST_TOKEN).unwrap(); + std::env::set_var("HOME", home.to_str().unwrap()); +} + async fn start_server(events_path: std::path::PathBuf) -> SocketAddr { use axum::Router; use axum::routing::get; @@ -32,12 +52,25 @@ async fn start_server(events_path: std::path::PathBuf) -> SocketAddr { let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - // axum::serve returns IntoFuture; use `into_future()` to spawn. use std::future::IntoFuture; tokio::spawn(axum::serve(listener, app).into_future()); addr } +/// Build an authenticated WS request. +fn auth_request(url: &str) -> impl IntoClientRequest { + let mut req = url.into_client_request().unwrap(); + req.headers_mut().insert( + ORIGIN, + HeaderValue::from_static("http://localhost:3000"), + ); + req.headers_mut().insert( + "sec-websocket-protocol", + HeaderValue::from_str(&format!("bearer,{TEST_TOKEN}")).unwrap(), + ); + req +} + async fn recv_text( stream: &mut (impl StreamExt< Item = Result, @@ -52,30 +85,30 @@ async fn recv_text( #[tokio::test] async fn smoke_snapshot_and_event() { + let home_dir = tempfile::tempdir().unwrap(); + setup_token(home_dir.path()); + let mut tmp = NamedTempFile::new().unwrap(); let path = std::path::PathBuf::from(tmp.path()); let addr = start_server(path.clone()).await; - // Health check. + // Health check (no auth needed). let body = reqwest::get(format!("http://{addr}/health")) - .await - .unwrap() - .text() - .await - .unwrap(); + .await.unwrap().text().await.unwrap(); assert_eq!(body, "kei-graph-stream alive\n"); // Connect WS before any events — expect empty snapshot. - let (mut ws1, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let (mut ws1, _) = + connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true) + .await + .unwrap(); let snap: Value = recv_text(&mut ws1).await; assert_eq!(snap["type"], "snapshot"); assert!(snap["alive"].as_array().unwrap().is_empty()); // Append a spawn event. writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap(); - - // Allow tail poll (200ms) + margin. tokio::time::sleep(Duration::from_millis(500)).await; // Should receive an event frame on the existing connection. @@ -85,7 +118,10 @@ async fn smoke_snapshot_and_event() { assert_eq!(frame["data"]["id"], "smoke1"); // New client snapshot should contain smoke1. - let (mut ws2, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let (mut ws2, _) = + connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true) + .await + .unwrap(); let snap2: Value = recv_text(&mut ws2).await; assert_eq!(snap2["type"], "snapshot"); let alive2 = snap2["alive"].as_array().unwrap(); @@ -97,7 +133,10 @@ async fn smoke_snapshot_and_event() { tokio::time::sleep(Duration::from_millis(500)).await; // Third client: snapshot should now be empty. - let (mut ws3, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let (mut ws3, _) = + connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true) + .await + .unwrap(); let snap3: Value = recv_text(&mut ws3).await; assert_eq!(snap3["type"], "snapshot"); assert!(snap3["alive"].as_array().unwrap().is_empty()); diff --git a/_primitives/_rust/kei-runtime/Cargo.toml b/_primitives/_rust/kei-runtime/Cargo.toml index f2d84fd..bd7f5ab 100644 --- a/_primitives/_rust/kei-runtime/Cargo.toml +++ b/_primitives/_rust/kei-runtime/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "kei-runtime" version = "0.1.0" -edition = "2021" -rust-version = "1.75" +edition.workspace = true +rust-version.workspace = true description = "Atom invocation runtime + schema linter" authors = ["Denis Parfionovich "] @@ -15,21 +15,21 @@ name = "kei_runtime" path = "src/lib.rs" [dependencies] -clap = { version = "4", features = ["derive"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" +clap = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } # SSRF + IMDS hardening: disable default features (resolve-http, cli) so the # validator has no HTTP resolver by default. We configure a file-only # resolver explicitly in `validate.rs`. jsonschema = { version = "0.18", default-features = false, features = ["resolve-file"] } -anyhow = "1" -walkdir = "2" -serde_yaml_ng = "0.10" +anyhow = { workspace = true } +walkdir = { workspace = true } +serde_yaml_ng = { workspace = true } kei-atom-discovery = { path = "../kei-atom-discovery" } -url = "2" +url = { workspace = true } [dev-dependencies] -tempfile = "3" +tempfile = { workspace = true } [package.metadata.keisei] backend = "none" diff --git a/_primitives/_rust/kei-runtime/src/invoke.rs b/_primitives/_rust/kei-runtime/src/invoke.rs index 736547b..e6c5e33 100644 --- a/_primitives/_rust/kei-runtime/src/invoke.rs +++ b/_primitives/_rust/kei-runtime/src/invoke.rs @@ -19,12 +19,17 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +/// Max bytes we read from subprocess stdout/stderr to guard against runaway output. +const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB + #[derive(Debug)] pub enum InvokeError { AtomNotFound(String), InputParse(String), InputInvalid(String), MissingInputSchema(String), + /// `crate_name` in atom YAML failed the `kei-*` allowlist check. + InvalidAtom(String), /// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`. BinaryNotFound { crate_name: String }, /// Subprocess exited non-zero — propagate the atom's own exit code. @@ -44,6 +49,7 @@ impl std::fmt::Display for InvokeError { Self::InputParse(e) => write!(f, "input rejected: {e}"), Self::InputInvalid(e) => write!(f, "input rejected: {e}"), Self::MissingInputSchema(id) => write!(f, "atom `{id}` declares no input schema"), + Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"), Self::BinaryNotFound { crate_name } => write!( f, "binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR" @@ -93,7 +99,36 @@ fn find_atom(root: &Path, atom_id: &str) -> Result { .ok_or_else(|| InvokeError::AtomNotFound(atom_id.to_string())) } +/// Validate `name` matches `^kei-[a-z][a-z0-9-]+$`; rejects path traversal and injection chars. +fn is_safe_crate_name(name: &str) -> bool { + if name.is_empty() || name.len() > 128 { + return false; + } + // Forbidden substrings — absolute path indicators, separators, injection chars. + for bad in &["/", "\\", "..", ":", "@", " "] { + if name.contains(bad) { + return false; + } + } + // Must match kei-[a-z][a-z0-9-]+ + let bytes = name.as_bytes(); + if !name.starts_with("kei-") || bytes.len() < 5 { + return false; + } + let after_prefix = &bytes[4..]; + if !after_prefix[0].is_ascii_lowercase() { + return false; + } + after_prefix[1..].iter().all(|&b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-') +} + fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result { + if !is_safe_crate_name(&meta.crate_name) { + return Err(InvokeError::InvalidAtom(format!( + "crate_name '{}' fails kei-* allowlist", + meta.crate_name + ))); + } let bin = resolve_binary(&meta.crate_name) .ok_or_else(|| InvokeError::BinaryNotFound { crate_name: meta.crate_name.clone() })?; let mut child = Command::new(&bin) @@ -115,10 +150,23 @@ fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result { handle_subprocess_output(meta, out) } +fn cap_bytes(data: Vec, label: &str) -> Vec { + if data.len() > OUTPUT_CAP { + let mut v = data; + v.truncate(OUTPUT_CAP); + eprintln!("[kei-runtime] {label} truncated at {OUTPUT_CAP} bytes"); + v + } else { + data + } +} + fn handle_subprocess_output( meta: &AtomMeta, - out: std::process::Output, + mut out: std::process::Output, ) -> Result { + out.stdout = cap_bytes(out.stdout, "stdout"); + out.stderr = cap_bytes(out.stderr, "stderr"); let code = out.status.code().unwrap_or(-1); if !out.status.success() { let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); @@ -130,9 +178,7 @@ fn handle_subprocess_output( Ok(Output { atom: meta.full_id.clone(), result }) } -/// Resolve `` as an executable: -/// 1. `$KEI_RUNTIME_BIN_DIR/` if env is set and file exists -/// 2. Walk `$PATH`, return first `/` that exists +/// Resolve `` as binary: first `$KEI_RUNTIME_BIN_DIR/`, then `$PATH`. fn resolve_binary(crate_name: &str) -> Option { if let Ok(bin_dir) = std::env::var("KEI_RUNTIME_BIN_DIR") { let candidate = PathBuf::from(bin_dir).join(crate_name); diff --git a/_primitives/_rust/kei-runtime/src/main.rs b/_primitives/_rust/kei-runtime/src/main.rs index f748f21..2ae11af 100644 --- a/_primitives/_rust/kei-runtime/src/main.rs +++ b/_primitives/_rust/kei-runtime/src/main.rs @@ -114,17 +114,18 @@ fn run_invoke(root: PathBuf, atom_id: String, input_arg: String) -> ExitCode { /// Map typed invoke errors to exit codes per locked §Runtime schema. /// -/// - `AtomNotFound | InputParse | InputInvalid | MissingInputSchema` → 2 (atom error) -/// - `AtomFailed { code, .. }` → passthrough child exit code -/// - `SubprocessError | OutputParse` → 1 (IO / malformed output) -/// - `BinaryNotFound` → 127 (POSIX command-not-found) -/// - `NotImplemented` → 64 (legacy escape) +/// - `AtomNotFound|InputParse|InputInvalid|MissingInputSchema|InvalidAtom` → 2 (atom error) +/// - `AtomFailed { code, .. }` → passthrough child exit code +/// - `SubprocessError|OutputParse` → 1 (IO / malformed output) +/// - `BinaryNotFound` → 127 (POSIX command-not-found) +/// - `NotImplemented` → 64 (legacy escape) fn invoke_exit_code(err: &InvokeError) -> u8 { match err { InvokeError::AtomNotFound(_) | InvokeError::InputParse(_) | InvokeError::InputInvalid(_) - | InvokeError::MissingInputSchema(_) => 2, + | InvokeError::MissingInputSchema(_) + | InvokeError::InvalidAtom(_) => 2, InvokeError::AtomFailed { code, .. } => { let c = *code; if (0..=255).contains(&c) { c as u8 } else { 1 } diff --git a/_primitives/_rust/kei-runtime/tests/invoke_exit_codes_smoke.rs b/_primitives/_rust/kei-runtime/tests/invoke_exit_codes_smoke.rs index b38ccbf..d4e2f86 100644 --- a/_primitives/_rust/kei-runtime/tests/invoke_exit_codes_smoke.rs +++ b/_primitives/_rust/kei-runtime/tests/invoke_exit_codes_smoke.rs @@ -96,3 +96,41 @@ fn invoke_binary_not_found_exits_127() { assert!(stderr.contains("not found"), "expected 'not found' in stderr: {stderr}"); } + +/// An atom whose `crate_name` is not in the `kei-*` allowlist should exit 2 +/// (InvalidAtom is mapped to the same "atom rejected" exit code). +#[test] +fn invoke_unsafe_crate_name_exits_2() { + let tmp = tempfile::tempdir().unwrap(); + // Write a well-structured atom dir but with a crate_name that would be + // dangerous (e.g. "rm") — this must be rejected before any binary lookup. + let crate_name = "rm"; + let verb = "all"; + let atoms = tmp.path().join(crate_name).join("atoms"); + let schemas = atoms.join("schemas"); + std::fs::create_dir_all(&schemas).unwrap(); + let input_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#; + let output_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#; + std::fs::write(schemas.join("all-input.json"), input_schema).unwrap(); + std::fs::write(schemas.join("all-output.json"), output_schema).unwrap(); + let md = format!( + "---\natom: {crate_name}::{verb}\nkind: command\nversion: \"0.1.0\"\n\ + input:\n schema: schemas/all-input.json\n\ + output:\n schema: schemas/all-output.json\n\ + side_effects: []\nidempotent: true\nstability: stable\n---\n" + ); + std::fs::write(atoms.join(format!("{verb}.md")), md).unwrap(); + let out = std::process::Command::new(BIN) + .arg("invoke") + .arg(format!("{crate_name}::{verb}")) + .arg("--input").arg("{}") + .arg("--root").arg(tmp.path()) + .output() + .expect("spawn kei-runtime"); + assert_eq!(out.status.code(), Some(2), + "expected exit 2 for unsafe crate_name; stderr: {}", + String::from_utf8_lossy(&out.stderr)); + let stderr = String::from_utf8_lossy(&out.stderr); + assert!(stderr.contains("allowlist") || stderr.contains("invalid"), + "expected allowlist error in stderr: {stderr}"); +}