KeiSeiKit-1.0/_primitives/_rust/kei-graph-stream/src/ws.rs
Parfii-bot 913d62c280 fix(security): RCE allowlist + WebSocket auth + SSH option-injection
Group D — three independent security primitives hardening (post-audit 2026-05-02).

kei-runtime — atom invoke RCE allowlist:
- invoke.rs: is_safe_crate_name validator (regex ^kei-[a-z][a-z0-9-]+$);
             rejects /, \\, .., :, absolute paths, empty, >128 chars.
             InvalidAtom error variant.
             stdout/stderr capped at 16 MiB (was unbounded).
- main.rs: InvalidAtom mapped to exit code 2.
- tests/invoke_exit_codes_smoke.rs: invoke_unsafe_crate_name_exits_2 added.
- Closes: any user able to write atoms/*.md with crate_name: "rm" or "sudo"
           triggered arbitrary command execution.

kei-graph-stream — WebSocket bearer + Origin:
- auth.rs (new, 142 LOC): token load + bearer extraction + Origin allowlist +
                            ConstantTimeEq compare; 8 unit tests.
- ws.rs: ws_handler validates Origin + bearer before upgrade (403/401 on failure).
- main.rs: --public-bind-i-accept-the-leak flag required for non-loopback bind;
            else bail!() with explicit error.
- tests/smoke.rs: rewritten with Origin + bearer headers via connect_async_with_config.
- Closes: WebSocket /stream had zero auth, zero Origin check; browser CSWSH could
           subscribe to agent activity broadcast; KEI_GRAPH_STREAM_BIND env silently
           accepted any SocketAddr.

kei-compute-baremetal — SSH option injection (CVE-2023-51385 class):
- ssh.rs: is_safe_user + is_safe_host validators (alphanumeric + -_.; reject leading -;
           max 64 chars; no @, :, /, \\, space).
- ssh.rs: -- sentinel before user@host argv (OpenSSH 9.6+ stops flag parsing).
- ssh.rs: StrictHostKeyChecking=yes default; KEI_BAREMETAL_ACCEPT_NEW=1 for TOFU.
- error.rs: InvalidRegion variant.
- provider.rs: validators applied in target_for_spec + target_for_handle.
- Closes: spec.region "-oProxyCommand=evil" triggered local RCE before TCP connect.

Test results: 29 passed; 0 failed across all three crates. cargo check clean.

Findings: RCE allowlist (Wave-A) + WebSocket auth (Wave-B) + SSH injection (Wave-B)
were unique-per-retest discoveries. None present in original wave-1 audit.

Note: kei-compute-baremetal/src/provider.rs at 300 LOC (was 268; +32 from validators).
Pre-existing >200 LOC violation, fix scope was security-additions only. Follow-up:
split provider.rs into provider.rs (<200) + provider_tests.rs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 21:40:24 +08:00

108 lines
3.4 KiB
Rust

use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response},
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::time::{Duration, interval};
use crate::auth::{extract_bearer, load_expected_token, tokens_match, validate_origin};
use crate::state::AliveState;
pub type AppState = (Arc<broadcast::Sender<String>>, Arc<AliveState>);
/// Axum handler: validates Origin + bearer before upgrading to WebSocket.
pub async fn ws_handler(
ws: WebSocketUpgrade,
headers: HeaderMap,
State((tx, alive)): State<AppState>,
) -> Response {
let origin = headers.get(header::ORIGIN).and_then(|v| v.to_str().ok());
if let Err(e) = validate_origin(origin) {
eprintln!("[kei-graph-stream] ws origin rejected: {e}");
return (StatusCode::FORBIDDEN, "forbidden\n").into_response();
}
let proto = headers
.get("sec-websocket-protocol")
.and_then(|v| v.to_str().ok());
if let Err(e) = check_bearer(proto) {
eprintln!("[kei-graph-stream] ws auth rejected: {e}");
return (StatusCode::UNAUTHORIZED, "unauthorized\n").into_response();
}
ws.protocols(["bearer"])
.on_upgrade(move |socket| handle_socket(socket, tx, alive))
}
fn check_bearer(protocol: Option<&str>) -> Result<(), crate::auth::AuthError> {
let expected = load_expected_token()?;
let got = extract_bearer(protocol)?;
if !tokens_match(&expected, got) {
return Err(crate::auth::AuthError::BearerInvalid);
}
Ok(())
}
async fn handle_socket(
mut socket: WebSocket,
tx: Arc<broadcast::Sender<String>>,
alive: Arc<AliveState>,
) {
// 1. Send snapshot of currently alive agents.
let snapshot = build_snapshot(&alive);
if socket.send(Message::Text(snapshot)).await.is_err() {
return;
}
// 2. Subscribe to broadcast AFTER snapshot to avoid missing events.
let mut rx = tx.subscribe();
let mut heartbeat = interval(Duration::from_secs(30));
heartbeat.tick().await; // consume the immediate first tick
loop {
tokio::select! {
// Broadcast event → forward to client.
result = rx.recv() => {
match result {
Ok(msg) => {
if socket.send(Message::Text(msg)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
eprintln!("[ws] client lagged {n} messages");
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
// Heartbeat ping every 30s.
_ = heartbeat.tick() => {
let ping = r#"{"type":"ping"}"#.to_string();
if socket.send(Message::Text(ping)).await.is_err() {
break;
}
}
// Client message (pong or close).
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
_ => {} // ignore other client frames
}
}
}
}
}
fn build_snapshot(alive: &AliveState) -> String {
let agents = alive.snapshot();
serde_json::json!({
"type": "snapshot",
"alive": agents,
})
.to_string()
}