Group D — three independent security primitives hardening (post-audit 2026-05-02).
kei-runtime — atom invoke RCE allowlist:
- invoke.rs: is_safe_crate_name validator (regex ^kei-[a-z][a-z0-9-]+$);
rejects /, \\, .., :, absolute paths, empty, >128 chars.
InvalidAtom error variant.
stdout/stderr capped at 16 MiB (was unbounded).
- main.rs: InvalidAtom mapped to exit code 2.
- tests/invoke_exit_codes_smoke.rs: invoke_unsafe_crate_name_exits_2 added.
- Closes: any user able to write atoms/*.md with crate_name: "rm" or "sudo"
triggered arbitrary command execution.
kei-graph-stream — WebSocket bearer + Origin:
- auth.rs (new, 142 LOC): token load + bearer extraction + Origin allowlist +
ConstantTimeEq compare; 8 unit tests.
- ws.rs: ws_handler validates Origin + bearer before upgrade (403/401 on failure).
- main.rs: --public-bind-i-accept-the-leak flag required for non-loopback bind;
else bail!() with explicit error.
- tests/smoke.rs: rewritten with Origin + bearer headers via connect_async_with_config.
- Closes: WebSocket /stream had zero auth, zero Origin check; browser CSWSH could
subscribe to agent activity broadcast; KEI_GRAPH_STREAM_BIND env silently
accepted any SocketAddr.
kei-compute-baremetal — SSH option injection (CVE-2023-51385 class):
- ssh.rs: is_safe_user + is_safe_host validators (alphanumeric + -_.; reject leading -;
max 64 chars; no @, :, /, \\, space).
- ssh.rs: -- sentinel before user@host argv (OpenSSH 9.6+ stops flag parsing).
- ssh.rs: StrictHostKeyChecking=yes default; KEI_BAREMETAL_ACCEPT_NEW=1 for TOFU.
- error.rs: InvalidRegion variant.
- provider.rs: validators applied in target_for_spec + target_for_handle.
- Closes: spec.region "-oProxyCommand=evil" triggered local RCE before TCP connect.
Test results: 29 passed; 0 failed across all three crates. cargo check clean.
Findings: RCE allowlist (Wave-A) + WebSocket auth (Wave-B) + SSH injection (Wave-B)
were unique-per-retest discoveries. None present in original wave-1 audit.
Note: kei-compute-baremetal/src/provider.rs at 300 LOC (was 268; +32 from validators).
Pre-existing >200 LOC violation, fix scope was security-additions only. Follow-up:
split provider.rs into provider.rs (<200) + provider_tests.rs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
143 lines
5 KiB
Rust
143 lines
5 KiB
Rust
/// Integration smoke test: spins up a real kei-graph-stream server on a random port,
|
|
/// appends events to a temp JSONL file, and verifies WS snapshot + event frames.
|
|
///
|
|
/// Auth: writes a known token to a temp file and sets HOME so `load_expected_token`
|
|
/// reads it. Connects with the correct `Origin` and `Sec-WebSocket-Protocol` headers.
|
|
use std::io::Write;
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use serde_json::Value;
|
|
use tempfile::NamedTempFile;
|
|
use tokio::sync::broadcast;
|
|
use tokio_tungstenite::{
|
|
connect_async_with_config,
|
|
tungstenite::{
|
|
client::IntoClientRequest,
|
|
http::header::{HeaderValue, ORIGIN},
|
|
Message,
|
|
},
|
|
};
|
|
use futures::StreamExt;
|
|
|
|
const TEST_TOKEN: &str = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
|
|
|
|
/// Write the test token to `<home>/.keisei/cortex.token` and set HOME.
|
|
fn setup_token(home: &std::path::Path) {
|
|
let dir = home.join(".keisei");
|
|
std::fs::create_dir_all(&dir).unwrap();
|
|
std::fs::write(dir.join("cortex.token"), TEST_TOKEN).unwrap();
|
|
std::env::set_var("HOME", home.to_str().unwrap());
|
|
}
|
|
|
|
async fn start_server(events_path: std::path::PathBuf) -> SocketAddr {
|
|
use axum::Router;
|
|
use axum::routing::get;
|
|
|
|
let (tx, _) = broadcast::channel::<String>(256);
|
|
let tx = Arc::new(tx);
|
|
let alive = Arc::new(kei_graph_stream::AliveState::new());
|
|
|
|
tokio::spawn(kei_graph_stream::tail::run(
|
|
events_path,
|
|
Arc::clone(&tx),
|
|
Arc::clone(&alive),
|
|
));
|
|
|
|
let app = Router::new()
|
|
.route("/stream", get(kei_graph_stream::ws::ws_handler))
|
|
.route("/health", get(|| async { "kei-graph-stream alive\n" }))
|
|
.with_state((tx, alive));
|
|
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
use std::future::IntoFuture;
|
|
tokio::spawn(axum::serve(listener, app).into_future());
|
|
addr
|
|
}
|
|
|
|
/// Build an authenticated WS request.
|
|
fn auth_request(url: &str) -> impl IntoClientRequest {
|
|
let mut req = url.into_client_request().unwrap();
|
|
req.headers_mut().insert(
|
|
ORIGIN,
|
|
HeaderValue::from_static("http://localhost:3000"),
|
|
);
|
|
req.headers_mut().insert(
|
|
"sec-websocket-protocol",
|
|
HeaderValue::from_str(&format!("bearer,{TEST_TOKEN}")).unwrap(),
|
|
);
|
|
req
|
|
}
|
|
|
|
async fn recv_text(
|
|
stream: &mut (impl StreamExt<
|
|
Item = Result<Message, tokio_tungstenite::tungstenite::Error>,
|
|
> + Unpin),
|
|
) -> Value {
|
|
loop {
|
|
if let Message::Text(t) = stream.next().await.unwrap().unwrap() {
|
|
return serde_json::from_str(&t).unwrap();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn smoke_snapshot_and_event() {
|
|
let home_dir = tempfile::tempdir().unwrap();
|
|
setup_token(home_dir.path());
|
|
|
|
let mut tmp = NamedTempFile::new().unwrap();
|
|
let path = std::path::PathBuf::from(tmp.path());
|
|
|
|
let addr = start_server(path.clone()).await;
|
|
|
|
// Health check (no auth needed).
|
|
let body = reqwest::get(format!("http://{addr}/health"))
|
|
.await.unwrap().text().await.unwrap();
|
|
assert_eq!(body, "kei-graph-stream alive\n");
|
|
|
|
// Connect WS before any events — expect empty snapshot.
|
|
let (mut ws1, _) =
|
|
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
|
|
.await
|
|
.unwrap();
|
|
let snap: Value = recv_text(&mut ws1).await;
|
|
assert_eq!(snap["type"], "snapshot");
|
|
assert!(snap["alive"].as_array().unwrap().is_empty());
|
|
|
|
// Append a spawn event.
|
|
writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap();
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
|
|
// Should receive an event frame on the existing connection.
|
|
let frame: Value = recv_text(&mut ws1).await;
|
|
assert_eq!(frame["type"], "event");
|
|
assert_eq!(frame["data"]["event"], "agent_spawn");
|
|
assert_eq!(frame["data"]["id"], "smoke1");
|
|
|
|
// New client snapshot should contain smoke1.
|
|
let (mut ws2, _) =
|
|
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
|
|
.await
|
|
.unwrap();
|
|
let snap2: Value = recv_text(&mut ws2).await;
|
|
assert_eq!(snap2["type"], "snapshot");
|
|
let alive2 = snap2["alive"].as_array().unwrap();
|
|
assert_eq!(alive2.len(), 1);
|
|
assert_eq!(alive2[0]["id"], "smoke1");
|
|
|
|
// Append done event.
|
|
writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:01.000Z","event":"agent_done","id":"smoke1","outcome":"functional","duration_ms":1000}}"#).unwrap();
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
|
|
// Third client: snapshot should now be empty.
|
|
let (mut ws3, _) =
|
|
connect_async_with_config(auth_request(&format!("ws://{addr}/stream")), None, true)
|
|
.await
|
|
.unwrap();
|
|
let snap3: Value = recv_text(&mut ws3).await;
|
|
assert_eq!(snap3["type"], "snapshot");
|
|
assert!(snap3["alive"].as_array().unwrap().is_empty());
|
|
}
|