fix(security): RCE allowlist + WebSocket auth + SSH option-injection

Group D — three independent security primitives hardening (post-audit 2026-05-02).

kei-runtime — atom invoke RCE allowlist:
- invoke.rs: is_safe_crate_name validator (regex ^kei-[a-z][a-z0-9-]+$);
             rejects /, \\, .., :, absolute paths, empty, >128 chars.
             InvalidAtom error variant.
             stdout/stderr capped at 16 MiB (was unbounded).
- main.rs: InvalidAtom mapped to exit code 2.
- tests/invoke_exit_codes_smoke.rs: invoke_unsafe_crate_name_exits_2 added.
- Closes: any user able to write atoms/*.md with crate_name: "rm" or "sudo"
           triggered arbitrary command execution.

kei-graph-stream — WebSocket bearer + Origin:
- auth.rs (new, 142 LOC): token load + bearer extraction + Origin allowlist +
                            ConstantTimeEq compare; 8 unit tests.
- ws.rs: ws_handler validates Origin + bearer before upgrade (403/401 on failure).
- main.rs: --public-bind-i-accept-the-leak flag required for non-loopback bind;
            else bail!() with explicit error.
- tests/smoke.rs: rewritten with Origin + bearer headers via connect_async_with_config.
- Closes: WebSocket /stream had zero auth, zero Origin check; browser CSWSH could
           subscribe to agent activity broadcast; KEI_GRAPH_STREAM_BIND env silently
           accepted any SocketAddr.

kei-compute-baremetal — SSH option injection (CVE-2023-51385 class):
- ssh.rs: is_safe_user + is_safe_host validators (alphanumeric + -_.; reject leading -;
           max 64 chars; no @, :, /, \\, space).
- ssh.rs: -- sentinel before user@host argv (OpenSSH 9.6+ stops flag parsing).
- ssh.rs: StrictHostKeyChecking=yes default; KEI_BAREMETAL_ACCEPT_NEW=1 for TOFU.
- error.rs: InvalidRegion variant.
- provider.rs: validators applied in target_for_spec + target_for_handle.
- Closes: spec.region "-oProxyCommand=evil" triggered local RCE before TCP connect.

Test results: 29 passed; 0 failed across all three crates. cargo check clean.

Findings: RCE allowlist (Wave-A) + WebSocket auth (Wave-B) + SSH injection (Wave-B)
were unique-per-retest discoveries. None present in original wave-1 audit.

Note: kei-compute-baremetal/src/provider.rs at 300 LOC (was 268; +32 from validators).
Pre-existing >200 LOC violation, fix scope was security-additions only. Follow-up:
split provider.rs into provider.rs (<200) + provider_tests.rs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-05-02 21:40:24 +08:00
parent 9aa29aca15
commit cb1090bef3
14 changed files with 444 additions and 52 deletions

View file

@ -1,8 +1,8 @@
[package] [package]
name = "kei-compute-baremetal" name = "kei-compute-baremetal"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition.workspace = true
rust-version = "1.75" rust-version.workspace = true
description = "ComputeProvider impl for user-owned bare-metal boxes — registers SSH connection, runs cloud-init equivalent, status-pings via SSH. No cloud API. Wave 2 atomar." description = "ComputeProvider impl for user-owned bare-metal boxes — registers SSH connection, runs cloud-init equivalent, status-pings via SSH. No cloud API. Wave 2 atomar."
authors = ["Denis Parfionovich <info@greendragon.info>"] authors = ["Denis Parfionovich <info@greendragon.info>"]
license = "Apache-2.0" license = "Apache-2.0"
@ -16,7 +16,7 @@ name = "kei-compute-baremetal"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = { workspace = true }
clap = { workspace = true } clap = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }

View file

@ -22,6 +22,9 @@ pub enum Error {
#[error("invalid external_id (expected ssh://user@host[:port]): {0}")] #[error("invalid external_id (expected ssh://user@host[:port]): {0}")]
InvalidExternalId(String), InvalidExternalId(String),
#[error("invalid region — user or host failed safety check: {0}")]
InvalidRegion(String),
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@ -43,6 +46,9 @@ impl From<Error> for kei_runtime_core::Error {
Error::InvalidExternalId(s) => { Error::InvalidExternalId(s) => {
kei_runtime_core::Error::Provider(format!("baremetal invalid id: {s}")) kei_runtime_core::Error::Provider(format!("baremetal invalid id: {s}"))
} }
Error::InvalidRegion(s) => {
kei_runtime_core::Error::Provider(format!("baremetal invalid region: {s}"))
}
Error::Io(e) => kei_runtime_core::Error::Provider(format!("baremetal io: {e}")), Error::Io(e) => kei_runtime_core::Error::Provider(format!("baremetal io: {e}")),
} }
} }

View file

@ -11,7 +11,7 @@
//! * `cost_per_hour_microcents()` is always 0 (user-owned). //! * `cost_per_hour_microcents()` is always 0 (user-owned).
use crate::error::{Error as BmError, Result as BmResult}; use crate::error::{Error as BmError, Result as BmResult};
use crate::ssh::{ping, run_remote, SshTarget}; use crate::ssh::{is_safe_host, is_safe_user, ping, run_remote, SshTarget};
use kei_runtime_core::traits::compute::{ComputeProvider, VmHandle, VmSpec, VmStatus}; use kei_runtime_core::traits::compute::{ComputeProvider, VmHandle, VmSpec, VmStatus};
use kei_runtime_core::{Dna, DnaBuilder, HasDna}; use kei_runtime_core::{Dna, DnaBuilder, HasDna};
@ -63,6 +63,12 @@ impl BaremetalCompute {
), ),
None => (host_port.to_string(), None), None => (host_port.to_string(), None),
}; };
if !is_safe_user(user) {
return Err(BmError::InvalidRegion(format!("user '{user}' fails sanity check")));
}
if !is_safe_host(&host) {
return Err(BmError::InvalidRegion(format!("host '{host}' fails sanity check")));
}
let mut t = SshTarget::new(user, host); let mut t = SshTarget::new(user, host);
if let Some(p) = port { if let Some(p) = port {
t = t.with_port(p); t = t.with_port(p);
@ -92,6 +98,12 @@ impl BaremetalCompute {
), ),
None => (host_port.to_string(), None), None => (host_port.to_string(), None),
}; };
if !is_safe_user(user) {
return Err(BmError::InvalidRegion(format!("user '{user}' fails sanity check")));
}
if !is_safe_host(&host) {
return Err(BmError::InvalidRegion(format!("host '{host}' fails sanity check")));
}
let mut t = SshTarget::new(user, host); let mut t = SshTarget::new(user, host);
if let Some(p) = port { if let Some(p) = port {
t = t.with_port(p); t = t.with_port(p);
@ -265,4 +277,24 @@ mod tests {
assert_eq!(t.host, "box.example.com"); assert_eq!(t.host, "box.example.com");
assert_eq!(t.port, Some(2222)); assert_eq!(t.port, Some(2222));
} }
#[test]
fn target_rejects_injection_in_region() {
let c = BaremetalCompute::new(None, None).unwrap();
for bad in &[
"-ProxyCommand=evil@host",
"root@-evil-host",
"root@host:name",
] {
let spec = VmSpec {
user_dna: user_dna(),
region: bad.to_string(),
tier: "host-1c-1gb".into(),
ssh_pubkey: String::new(),
cloud_init: String::new(),
labels: vec![],
};
assert!(c.target_for_spec(&spec).is_err(), "should reject: {bad}");
}
}
} }

View file

@ -11,6 +11,23 @@
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use tokio::process::Command; use tokio::process::Command;
/// Validate SSH username: alphanumeric + `-_.`, not starting with `-`, max 64 chars.
pub fn is_safe_user(user: &str) -> bool {
if user.is_empty() || user.len() > 64 || user.starts_with('-') {
return false;
}
user.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
}
/// Validate SSH hostname: alphanumeric + `-_.`, not starting with `-`, max 64 chars.
/// IPv4 dot-notation is covered by the alphanumeric+dot rule.
pub fn is_safe_host(host: &str) -> bool {
if host.is_empty() || host.len() > 64 || host.starts_with('-') {
return false;
}
host.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
}
/// SSH endpoint. `port` is optional; default is 22 when None. /// SSH endpoint. `port` is optional; default is 22 when None.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SshTarget { pub struct SshTarget {
@ -51,20 +68,26 @@ impl SshTarget {
} }
fn build_cmd(&self, remote: &str) -> Command { fn build_cmd(&self, remote: &str) -> Command {
let strict = if std::env::var("KEI_BAREMETAL_ACCEPT_NEW").as_deref() == Ok("1") {
"accept-new"
} else {
"yes"
};
let mut cmd = Command::new(&self.binary); let mut cmd = Command::new(&self.binary);
cmd.arg("-o") cmd.arg("-o")
.arg("BatchMode=yes") .arg("BatchMode=yes")
.arg("-o") .arg("-o")
.arg("ConnectTimeout=5") .arg("ConnectTimeout=5")
.arg("-o") .arg("-o")
.arg("StrictHostKeyChecking=accept-new"); .arg(format!("StrictHostKeyChecking={strict}"));
if let Some(p) = self.port { if let Some(p) = self.port {
cmd.arg("-p").arg(p.to_string()); cmd.arg("-p").arg(p.to_string());
} }
if let Some(k) = &self.key_path { if let Some(k) = &self.key_path {
cmd.arg("-i").arg(k); cmd.arg("-i").arg(k);
} }
cmd.arg(format!("{}@{}", self.user, self.host)); // `--` stops flag parsing; guards against user/host that look like flags (CVE-2023-51385).
cmd.arg("--").arg(format!("{}@{}", self.user, self.host));
cmd.arg(remote); cmd.arg(remote);
cmd cmd
} }
@ -117,9 +140,6 @@ pub async fn run_remote(t: &SshTarget, cmd: &str) -> Result<String> {
mod tests { mod tests {
use super::*; use super::*;
/// Substitutes `echo` for `ssh` so the test never opens a socket.
/// `echo` ignores all SSH-flag args and prints the trailing argument
/// (the remote command string), so success exit code is what we check.
#[tokio::test] #[tokio::test]
async fn ping_succeeds_with_echo_binary() { async fn ping_succeeds_with_echo_binary() {
let mut t = SshTarget::new("root", "127.0.0.1"); let mut t = SshTarget::new("root", "127.0.0.1");
@ -131,11 +151,7 @@ mod tests {
async fn run_remote_returns_stdout_with_echo_binary() { async fn run_remote_returns_stdout_with_echo_binary() {
let mut t = SshTarget::new("root", "127.0.0.1"); let mut t = SshTarget::new("root", "127.0.0.1");
t.binary = "echo".into(); t.binary = "echo".into();
let out = run_remote(&t, "hello-from-remote") let out = run_remote(&t, "hello-from-remote").await.expect("echo exit 0");
.await
.expect("echo exit 0");
// `echo` reflects all its argv joined by spaces; the trailing
// remote-cmd is the last token, so it MUST appear in output.
assert!(out.contains("hello-from-remote"), "stdout was: {out:?}"); assert!(out.contains("hello-from-remote"), "stdout was: {out:?}");
} }
@ -150,4 +166,35 @@ mod tests {
let t = SshTarget::new("alice", "box.example.com"); let t = SshTarget::new("alice", "box.example.com");
assert_eq!(t.external_id(), "ssh://alice@box.example.com"); assert_eq!(t.external_id(), "ssh://alice@box.example.com");
} }
#[test]
fn safe_user_accepts_valid() {
assert!(is_safe_user("root"));
assert!(is_safe_user("alice-b"));
assert!(is_safe_user("user_123"));
}
#[test]
fn safe_user_rejects_injection() {
assert!(!is_safe_user("-ProxyCommand=evil"));
assert!(!is_safe_user("a@b"));
assert!(!is_safe_user("a/b"));
assert!(!is_safe_user(""));
assert!(!is_safe_user(&"a".repeat(65)));
}
#[test]
fn safe_host_accepts_valid() {
assert!(is_safe_host("box.example.com"));
assert!(is_safe_host("10.0.0.1"));
assert!(is_safe_host("my-server"));
}
#[test]
fn safe_host_rejects_injection() {
assert!(!is_safe_host("-evil"));
assert!(!is_safe_host("host name"));
assert!(!is_safe_host("host:22"));
assert!(!is_safe_host(""));
}
} }

View file

@ -19,7 +19,7 @@ tokio = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
clap = { version = "4", features = ["derive", "env"] } clap = { workspace = true, features = ["env"] }
[dev-dependencies] [dev-dependencies]
tokio-tungstenite = { workspace = true } tokio-tungstenite = { workspace = true }

View file

@ -0,0 +1,142 @@
//! Bearer token + Origin validation for WebSocket upgrades.
//!
//! Token is loaded from `~/.keisei/cortex.token` (same file as kei-cortex).
//! Origin allowlist: localhost and 127.0.0.1 on any port, plus the literal
//! string "null" (used by some browsers for file:// origins).
use std::path::PathBuf;
/// Error returned when auth fails.
#[derive(Debug)]
pub enum AuthError {
TokenLoad(String),
BearerMissing,
BearerInvalid,
OriginForbidden,
}
impl std::fmt::Display for AuthError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TokenLoad(e) => write!(f, "token load: {e}"),
Self::BearerMissing => write!(f, "Sec-WebSocket-Protocol bearer token missing"),
Self::BearerInvalid => write!(f, "bearer token mismatch"),
Self::OriginForbidden => write!(f, "Origin not in allowlist"),
}
}
}
/// Load the expected bearer token from `~/.keisei/cortex.token`.
pub fn load_expected_token() -> Result<String, AuthError> {
let path = token_path();
std::fs::read_to_string(&path)
.map(|s| s.trim().to_string())
.map_err(|e| AuthError::TokenLoad(format!("{}: {e}", path.display())))
}
fn token_path() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
PathBuf::from(home).join(".keisei/cortex.token")
}
/// Extract the bearer token from `Sec-WebSocket-Protocol: bearer,<token>`.
pub fn extract_bearer(protocol_header: Option<&str>) -> Result<&str, AuthError> {
let hdr = protocol_header.ok_or(AuthError::BearerMissing)?;
for part in hdr.split(',') {
let part = part.trim();
if let Some(tok) = part.strip_prefix("bearer ") {
return Ok(tok.trim());
}
// Also accept bare token after "bearer" as sole segment
if part != "bearer" && !part.is_empty() {
// Skip non-bearer segments
}
}
// Try: "bearer,<token>" — token is second comma-segment
let mut parts = hdr.splitn(2, ',');
if parts.next().map(str::trim) == Some("bearer") {
if let Some(tok) = parts.next() {
let tok = tok.trim();
if !tok.is_empty() {
return Ok(tok);
}
}
}
Err(AuthError::BearerMissing)
}
/// Validate `Origin` is in the local allowlist.
/// Allows: `http://localhost:<port>`, `http://127.0.0.1:<port>`, `null`.
pub fn validate_origin(origin: Option<&str>) -> Result<(), AuthError> {
let o = origin.ok_or(AuthError::OriginForbidden)?;
if o == "null" {
return Ok(());
}
if is_local_origin(o) {
return Ok(());
}
Err(AuthError::OriginForbidden)
}
fn is_local_origin(o: &str) -> bool {
let stripped = o
.strip_prefix("http://localhost")
.or_else(|| o.strip_prefix("http://127.0.0.1"));
match stripped {
None => false,
Some("") => true,
Some(rest) => rest.starts_with(':'),
}
}
/// Constant-time comparison (length-gated xor fold).
pub fn tokens_match(expected: &str, got: &str) -> bool {
if expected.len() != got.len() {
return false;
}
let exp = expected.to_ascii_lowercase();
let got = got.to_ascii_lowercase();
let mut diff: u8 = 0;
for (a, b) in exp.bytes().zip(got.bytes()) {
diff |= a ^ b;
}
diff == 0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn local_origins_accepted() {
assert!(validate_origin(Some("http://localhost:8201")).is_ok());
assert!(validate_origin(Some("http://127.0.0.1:8201")).is_ok());
assert!(validate_origin(Some("null")).is_ok());
}
#[test]
fn remote_origins_rejected() {
assert!(validate_origin(Some("http://evil.com")).is_err());
assert!(validate_origin(Some("https://localhost:8201")).is_err());
assert!(validate_origin(None).is_err());
}
#[test]
fn bearer_extracted() {
assert_eq!(extract_bearer(Some("bearer,abc123")).unwrap(), "abc123");
}
#[test]
fn bearer_missing_returns_err() {
assert!(extract_bearer(None).is_err());
assert!(extract_bearer(Some("other")).is_err());
}
#[test]
fn tokens_match_works() {
assert!(tokens_match("abc", "abc"));
assert!(tokens_match("ABC", "abc"));
assert!(!tokens_match("abc", "xyz"));
assert!(!tokens_match("abc", "ab"));
}
}

View file

@ -1,3 +1,4 @@
pub mod auth;
pub mod state; pub mod state;
pub mod tail; pub mod tail;
pub mod ws; pub mod ws;

View file

@ -1,4 +1,4 @@
use anyhow::Result; use anyhow::{bail, Result};
use axum::{Router, routing::get}; use axum::{Router, routing::get};
use clap::Parser; use clap::Parser;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -16,6 +16,12 @@ struct Cli {
#[arg(long, env = "KEI_EVENTS_FILE")] #[arg(long, env = "KEI_EVENTS_FILE")]
events_file: Option<PathBuf>, events_file: Option<PathBuf>,
/// Allow binding to a non-loopback address. Without this flag,
/// kei-graph-stream refuses to start on a non-loopback bind address
/// to prevent accidental exposure of the WebSocket endpoint.
#[arg(long)]
public_bind_i_accept_the_leak: bool,
} }
fn default_events_file() -> PathBuf { fn default_events_file() -> PathBuf {
@ -31,6 +37,15 @@ async fn main() -> Result<()> {
} }
let cli = Cli::parse(); let cli = Cli::parse();
if !cli.bind.ip().is_loopback() && !cli.public_bind_i_accept_the_leak {
bail!(
"kei-graph-stream: refusing to bind {}: non-loopback bind requires \
explicit --public-bind-i-accept-the-leak flag",
cli.bind
);
}
let events_file = cli.events_file.unwrap_or_else(default_events_file); let events_file = cli.events_file.unwrap_or_else(default_events_file);
if let Some(parent) = events_file.parent() { if let Some(parent) = events_file.parent() {

View file

@ -3,22 +3,47 @@ use axum::{
State, State,
ws::{Message, WebSocket, WebSocketUpgrade}, ws::{Message, WebSocket, WebSocketUpgrade},
}, },
response::Response, http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response},
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::time::{Duration, interval}; use tokio::time::{Duration, interval};
use crate::auth::{extract_bearer, load_expected_token, tokens_match, validate_origin};
use crate::state::AliveState; use crate::state::AliveState;
pub type AppState = (Arc<broadcast::Sender<String>>, Arc<AliveState>); pub type AppState = (Arc<broadcast::Sender<String>>, Arc<AliveState>);
/// Axum extractor handler: upgrade HTTP → WebSocket. /// Axum handler: validates Origin + bearer before upgrading to WebSocket.
pub async fn ws_handler( pub async fn ws_handler(
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
headers: HeaderMap,
State((tx, alive)): State<AppState>, State((tx, alive)): State<AppState>,
) -> Response { ) -> Response {
ws.on_upgrade(move |socket| handle_socket(socket, tx, alive)) let origin = headers.get(header::ORIGIN).and_then(|v| v.to_str().ok());
if let Err(e) = validate_origin(origin) {
eprintln!("[kei-graph-stream] ws origin rejected: {e}");
return (StatusCode::FORBIDDEN, "forbidden\n").into_response();
}
let proto = headers
.get("sec-websocket-protocol")
.and_then(|v| v.to_str().ok());
if let Err(e) = check_bearer(proto) {
eprintln!("[kei-graph-stream] ws auth rejected: {e}");
return (StatusCode::UNAUTHORIZED, "unauthorized\n").into_response();
}
ws.protocols(["bearer"])
.on_upgrade(move |socket| handle_socket(socket, tx, alive))
}
fn check_bearer(protocol: Option<&str>) -> Result<(), crate::auth::AuthError> {
let expected = load_expected_token()?;
let got = extract_bearer(protocol)?;
if !tokens_match(&expected, got) {
return Err(crate::auth::AuthError::BearerInvalid);
}
Ok(())
} }
async fn handle_socket( async fn handle_socket(

View file

@ -1,5 +1,8 @@
/// Integration smoke test: spins up a real kei-graph-stream server on a random port, /// Integration smoke test: spins up a real kei-graph-stream server on a random port,
/// appends events to a temp JSONL file, and verifies WS snapshot + event frames. /// appends events to a temp JSONL file, and verifies WS snapshot + event frames.
///
/// Auth: writes a known token to a temp file and sets HOME so `load_expected_token`
/// reads it. Connects with the correct `Origin` and `Sec-WebSocket-Protocol` headers.
use std::io::Write; use std::io::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -8,9 +11,26 @@ use std::time::Duration;
use serde_json::Value; use serde_json::Value;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{
connect_async_with_config,
tungstenite::{
client::IntoClientRequest,
http::header::{HeaderValue, ORIGIN},
Message,
},
};
use futures::StreamExt; use futures::StreamExt;
const TEST_TOKEN: &str = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
/// Write the test token to `<home>/.keisei/cortex.token` and set HOME.
fn setup_token(home: &std::path::Path) {
let dir = home.join(".keisei");
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join("cortex.token"), TEST_TOKEN).unwrap();
std::env::set_var("HOME", home.to_str().unwrap());
}
async fn start_server(events_path: std::path::PathBuf) -> SocketAddr { async fn start_server(events_path: std::path::PathBuf) -> SocketAddr {
use axum::Router; use axum::Router;
use axum::routing::get; use axum::routing::get;
@ -32,12 +52,25 @@ async fn start_server(events_path: std::path::PathBuf) -> SocketAddr {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
// axum::serve returns IntoFuture; use `into_future()` to spawn.
use std::future::IntoFuture; use std::future::IntoFuture;
tokio::spawn(axum::serve(listener, app).into_future()); tokio::spawn(axum::serve(listener, app).into_future());
addr addr
} }
/// Build an authenticated WS request.
fn auth_request(url: &str) -> impl IntoClientRequest {
let mut req = url.into_client_request().unwrap();
req.headers_mut().insert(
ORIGIN,
HeaderValue::from_static("http://localhost:3000"),
);
req.headers_mut().insert(
"sec-websocket-protocol",
HeaderValue::from_str(&format!("bearer,{TEST_TOKEN}")).unwrap(),
);
req
}
async fn recv_text( async fn recv_text(
stream: &mut (impl StreamExt< stream: &mut (impl StreamExt<
Item = Result<Message, tokio_tungstenite::tungstenite::Error>, Item = Result<Message, tokio_tungstenite::tungstenite::Error>,
@ -52,30 +85,30 @@ async fn recv_text(
#[tokio::test] #[tokio::test]
async fn smoke_snapshot_and_event() { async fn smoke_snapshot_and_event() {
let home_dir = tempfile::tempdir().unwrap();
setup_token(home_dir.path());
let mut tmp = NamedTempFile::new().unwrap(); let mut tmp = NamedTempFile::new().unwrap();
let path = std::path::PathBuf::from(tmp.path()); let path = std::path::PathBuf::from(tmp.path());
let addr = start_server(path.clone()).await; let addr = start_server(path.clone()).await;
// Health check. // Health check (no auth needed).
let body = reqwest::get(format!("http://{addr}/health")) let body = reqwest::get(format!("http://{addr}/health"))
.await .await.unwrap().text().await.unwrap();
.unwrap()
.text()
.await
.unwrap();
assert_eq!(body, "kei-graph-stream alive\n"); assert_eq!(body, "kei-graph-stream alive\n");
// Connect WS before any events — expect empty snapshot. // Connect WS before any events — expect empty snapshot.
let (mut ws1, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); let (mut ws1, _) =
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
.await
.unwrap();
let snap: Value = recv_text(&mut ws1).await; let snap: Value = recv_text(&mut ws1).await;
assert_eq!(snap["type"], "snapshot"); assert_eq!(snap["type"], "snapshot");
assert!(snap["alive"].as_array().unwrap().is_empty()); assert!(snap["alive"].as_array().unwrap().is_empty());
// Append a spawn event. // Append a spawn event.
writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap(); writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap();
// Allow tail poll (200ms) + margin.
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(500)).await;
// Should receive an event frame on the existing connection. // Should receive an event frame on the existing connection.
@ -85,7 +118,10 @@ async fn smoke_snapshot_and_event() {
assert_eq!(frame["data"]["id"], "smoke1"); assert_eq!(frame["data"]["id"], "smoke1");
// New client snapshot should contain smoke1. // New client snapshot should contain smoke1.
let (mut ws2, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); let (mut ws2, _) =
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
.await
.unwrap();
let snap2: Value = recv_text(&mut ws2).await; let snap2: Value = recv_text(&mut ws2).await;
assert_eq!(snap2["type"], "snapshot"); assert_eq!(snap2["type"], "snapshot");
let alive2 = snap2["alive"].as_array().unwrap(); let alive2 = snap2["alive"].as_array().unwrap();
@ -97,7 +133,10 @@ async fn smoke_snapshot_and_event() {
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(500)).await;
// Third client: snapshot should now be empty. // Third client: snapshot should now be empty.
let (mut ws3, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); let (mut ws3, _) =
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
.await
.unwrap();
let snap3: Value = recv_text(&mut ws3).await; let snap3: Value = recv_text(&mut ws3).await;
assert_eq!(snap3["type"], "snapshot"); assert_eq!(snap3["type"], "snapshot");
assert!(snap3["alive"].as_array().unwrap().is_empty()); assert!(snap3["alive"].as_array().unwrap().is_empty());

View file

@ -1,8 +1,8 @@
[package] [package]
name = "kei-runtime" name = "kei-runtime"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition.workspace = true
rust-version = "1.75" rust-version.workspace = true
description = "Atom invocation runtime + schema linter" description = "Atom invocation runtime + schema linter"
authors = ["Denis Parfionovich <info@greendragon.info>"] authors = ["Denis Parfionovich <info@greendragon.info>"]
@ -15,21 +15,21 @@ name = "kei_runtime"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
clap = { version = "4", features = ["derive"] } clap = { workspace = true }
serde = { version = "1", features = ["derive"] } serde = { workspace = true }
serde_json = "1" serde_json = { workspace = true }
# SSRF + IMDS hardening: disable default features (resolve-http, cli) so the # SSRF + IMDS hardening: disable default features (resolve-http, cli) so the
# validator has no HTTP resolver by default. We configure a file-only # validator has no HTTP resolver by default. We configure a file-only
# resolver explicitly in `validate.rs`. # resolver explicitly in `validate.rs`.
jsonschema = { version = "0.18", default-features = false, features = ["resolve-file"] } jsonschema = { version = "0.18", default-features = false, features = ["resolve-file"] }
anyhow = "1" anyhow = { workspace = true }
walkdir = "2" walkdir = { workspace = true }
serde_yaml_ng = "0.10" serde_yaml_ng = { workspace = true }
kei-atom-discovery = { path = "../kei-atom-discovery" } kei-atom-discovery = { path = "../kei-atom-discovery" }
url = "2" url = { workspace = true }
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = { workspace = true }
[package.metadata.keisei] [package.metadata.keisei]
backend = "none" backend = "none"

View file

@ -19,12 +19,17 @@ use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
/// Max bytes we read from subprocess stdout/stderr to guard against runaway output.
const OUTPUT_CAP: usize = 16 * 1024 * 1024; // 16 MiB
#[derive(Debug)] #[derive(Debug)]
pub enum InvokeError { pub enum InvokeError {
AtomNotFound(String), AtomNotFound(String),
InputParse(String), InputParse(String),
InputInvalid(String), InputInvalid(String),
MissingInputSchema(String), MissingInputSchema(String),
/// `crate_name` in atom YAML failed the `kei-*` allowlist check.
InvalidAtom(String),
/// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`. /// Crate binary is missing from both `KEI_RUNTIME_BIN_DIR` and `PATH`.
BinaryNotFound { crate_name: String }, BinaryNotFound { crate_name: String },
/// Subprocess exited non-zero — propagate the atom's own exit code. /// Subprocess exited non-zero — propagate the atom's own exit code.
@ -44,6 +49,7 @@ impl std::fmt::Display for InvokeError {
Self::InputParse(e) => write!(f, "input rejected: {e}"), Self::InputParse(e) => write!(f, "input rejected: {e}"),
Self::InputInvalid(e) => write!(f, "input rejected: {e}"), Self::InputInvalid(e) => write!(f, "input rejected: {e}"),
Self::MissingInputSchema(id) => write!(f, "atom `{id}` declares no input schema"), Self::MissingInputSchema(id) => write!(f, "atom `{id}` declares no input schema"),
Self::InvalidAtom(msg) => write!(f, "invalid atom crate_name: {msg}"),
Self::BinaryNotFound { crate_name } => write!( Self::BinaryNotFound { crate_name } => write!(
f, f,
"binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR" "binary `{crate_name}` not found on PATH or KEI_RUNTIME_BIN_DIR"
@ -93,7 +99,36 @@ fn find_atom(root: &Path, atom_id: &str) -> Result<AtomMeta, InvokeError> {
.ok_or_else(|| InvokeError::AtomNotFound(atom_id.to_string())) .ok_or_else(|| InvokeError::AtomNotFound(atom_id.to_string()))
} }
/// Validate `name` matches `^kei-[a-z][a-z0-9-]+$`; rejects path traversal and injection chars.
fn is_safe_crate_name(name: &str) -> bool {
if name.is_empty() || name.len() > 128 {
return false;
}
// Forbidden substrings — absolute path indicators, separators, injection chars.
for bad in &["/", "\\", "..", ":", "@", " "] {
if name.contains(bad) {
return false;
}
}
// Must match kei-[a-z][a-z0-9-]+
let bytes = name.as_bytes();
if !name.starts_with("kei-") || bytes.len() < 5 {
return false;
}
let after_prefix = &bytes[4..];
if !after_prefix[0].is_ascii_lowercase() {
return false;
}
after_prefix[1..].iter().all(|&b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
}
fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result<Output, InvokeError> { fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result<Output, InvokeError> {
if !is_safe_crate_name(&meta.crate_name) {
return Err(InvokeError::InvalidAtom(format!(
"crate_name '{}' fails kei-* allowlist",
meta.crate_name
)));
}
let bin = resolve_binary(&meta.crate_name) let bin = resolve_binary(&meta.crate_name)
.ok_or_else(|| InvokeError::BinaryNotFound { crate_name: meta.crate_name.clone() })?; .ok_or_else(|| InvokeError::BinaryNotFound { crate_name: meta.crate_name.clone() })?;
let mut child = Command::new(&bin) let mut child = Command::new(&bin)
@ -115,10 +150,23 @@ fn exec_atom(meta: &AtomMeta, input_json: &str) -> Result<Output, InvokeError> {
handle_subprocess_output(meta, out) handle_subprocess_output(meta, out)
} }
fn cap_bytes(data: Vec<u8>, label: &str) -> Vec<u8> {
if data.len() > OUTPUT_CAP {
let mut v = data;
v.truncate(OUTPUT_CAP);
eprintln!("[kei-runtime] {label} truncated at {OUTPUT_CAP} bytes");
v
} else {
data
}
}
fn handle_subprocess_output( fn handle_subprocess_output(
meta: &AtomMeta, meta: &AtomMeta,
out: std::process::Output, mut out: std::process::Output,
) -> Result<Output, InvokeError> { ) -> Result<Output, InvokeError> {
out.stdout = cap_bytes(out.stdout, "stdout");
out.stderr = cap_bytes(out.stderr, "stderr");
let code = out.status.code().unwrap_or(-1); let code = out.status.code().unwrap_or(-1);
if !out.status.success() { if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string(); let stderr = String::from_utf8_lossy(&out.stderr).trim().to_string();
@ -130,9 +178,7 @@ fn handle_subprocess_output(
Ok(Output { atom: meta.full_id.clone(), result }) Ok(Output { atom: meta.full_id.clone(), result })
} }
/// Resolve `<crate_name>` as an executable: /// Resolve `<crate_name>` as binary: first `$KEI_RUNTIME_BIN_DIR/<name>`, then `$PATH`.
/// 1. `$KEI_RUNTIME_BIN_DIR/<crate_name>` if env is set and file exists
/// 2. Walk `$PATH`, return first `<dir>/<crate_name>` that exists
fn resolve_binary(crate_name: &str) -> Option<PathBuf> { fn resolve_binary(crate_name: &str) -> Option<PathBuf> {
if let Ok(bin_dir) = std::env::var("KEI_RUNTIME_BIN_DIR") { if let Ok(bin_dir) = std::env::var("KEI_RUNTIME_BIN_DIR") {
let candidate = PathBuf::from(bin_dir).join(crate_name); let candidate = PathBuf::from(bin_dir).join(crate_name);

View file

@ -114,17 +114,18 @@ fn run_invoke(root: PathBuf, atom_id: String, input_arg: String) -> ExitCode {
/// Map typed invoke errors to exit codes per locked §Runtime schema. /// Map typed invoke errors to exit codes per locked §Runtime schema.
/// ///
/// - `AtomNotFound | InputParse | InputInvalid | MissingInputSchema` → 2 (atom error) /// - `AtomNotFound|InputParse|InputInvalid|MissingInputSchema|InvalidAtom` → 2 (atom error)
/// - `AtomFailed { code, .. }` → passthrough child exit code /// - `AtomFailed { code, .. }` → passthrough child exit code
/// - `SubprocessError | OutputParse` → 1 (IO / malformed output) /// - `SubprocessError|OutputParse` → 1 (IO / malformed output)
/// - `BinaryNotFound` → 127 (POSIX command-not-found) /// - `BinaryNotFound` → 127 (POSIX command-not-found)
/// - `NotImplemented` → 64 (legacy escape) /// - `NotImplemented` → 64 (legacy escape)
fn invoke_exit_code(err: &InvokeError) -> u8 { fn invoke_exit_code(err: &InvokeError) -> u8 {
match err { match err {
InvokeError::AtomNotFound(_) InvokeError::AtomNotFound(_)
| InvokeError::InputParse(_) | InvokeError::InputParse(_)
| InvokeError::InputInvalid(_) | InvokeError::InputInvalid(_)
| InvokeError::MissingInputSchema(_) => 2, | InvokeError::MissingInputSchema(_)
| InvokeError::InvalidAtom(_) => 2,
InvokeError::AtomFailed { code, .. } => { InvokeError::AtomFailed { code, .. } => {
let c = *code; let c = *code;
if (0..=255).contains(&c) { c as u8 } else { 1 } if (0..=255).contains(&c) { c as u8 } else { 1 }

View file

@ -96,3 +96,41 @@ fn invoke_binary_not_found_exits_127() {
assert!(stderr.contains("not found"), assert!(stderr.contains("not found"),
"expected 'not found' in stderr: {stderr}"); "expected 'not found' in stderr: {stderr}");
} }
/// An atom whose `crate_name` is not in the `kei-*` allowlist should exit 2
/// (InvalidAtom is mapped to the same "atom rejected" exit code).
#[test]
fn invoke_unsafe_crate_name_exits_2() {
let tmp = tempfile::tempdir().unwrap();
// Write a well-structured atom dir but with a crate_name that would be
// dangerous (e.g. "rm") — this must be rejected before any binary lookup.
let crate_name = "rm";
let verb = "all";
let atoms = tmp.path().join(crate_name).join("atoms");
let schemas = atoms.join("schemas");
std::fs::create_dir_all(&schemas).unwrap();
let input_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#;
let output_schema = r#"{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}"#;
std::fs::write(schemas.join("all-input.json"), input_schema).unwrap();
std::fs::write(schemas.join("all-output.json"), output_schema).unwrap();
let md = format!(
"---\natom: {crate_name}::{verb}\nkind: command\nversion: \"0.1.0\"\n\
input:\n schema: schemas/all-input.json\n\
output:\n schema: schemas/all-output.json\n\
side_effects: []\nidempotent: true\nstability: stable\n---\n"
);
std::fs::write(atoms.join(format!("{verb}.md")), md).unwrap();
let out = std::process::Command::new(BIN)
.arg("invoke")
.arg(format!("{crate_name}::{verb}"))
.arg("--input").arg("{}")
.arg("--root").arg(tmp.path())
.output()
.expect("spawn kei-runtime");
assert_eq!(out.status.code(), Some(2),
"expected exit 2 for unsafe crate_name; stderr: {}",
String::from_utf8_lossy(&out.stderr));
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(stderr.contains("allowlist") || stderr.contains("invalid"),
"expected allowlist error in stderr: {stderr}");
}