KeiSeiKit-1.0/_primitives/_rust/kei-graph-stream/tests/smoke.rs
Parfii-bot 52a02dfbff feat(live-graph): WebSocket activity stream — orchestrator-centric live view
User pushback: "транслирует в онлайне какие агенты создаются? основное
окно агента, а дальше при запусках появляются новые ветки, мы показываем
в онлайне как агенты собираются и работают"

Earlier `kei-graph-export` rendered the static SUBSTRATE (all 581 atoms,
catalog-style). User wanted the LIFECYCLE: orchestrator at center, every
new agent as a fading-in branch, every tool call as a pulse, every
completion as a fade-out. TTL = until done; pure online, no history
accumulation per user direction.

Three-layer architecture, all conforming to schema /tmp/agent-events-schema.md:

LAYER 1 — Event emitters (4 hooks)
  hooks/agent-event-spawn.sh   PreToolUse:Agent  → agent_spawn event
  hooks/agent-event-done.sh    PostToolUse:Agent → agent_done event
                               (parses STATUS-TRUTH MARKER for outcome,
                                computes cost_usd from token×pricing table)
  hooks/tool-use-event.sh      PreToolUse:Bash|Read|Edit|Write|Grep|Glob|NotebookEdit
                               → tool_use event
  hooks/skill-record.sh        EXTENDED — second emit step writes skill_use
                               event in addition to existing kei-ledger
                               record-skill call

  All 4 are POSIX /bin/sh, defensive (never block, exit 0), bypass via
  KEI_EVENTS_BYPASS=1. Append-only JSONL to
  ~/.claude/memory/agent-events.jsonl.

  Smoke: 4 synthetic invocations cover spawn/done/tool/filter cases.

LAYER 2 — kei-graph-stream Rust daemon
  _primitives/_rust/kei-graph-stream/  (~480 LOC, 5 files + 1 test)

  - Tails events.jsonl every 200ms (poll-based, no notify dep).
  - Parses each event, updates AliveState (insert on spawn, remove on done).
  - Broadcasts {"type":"event","data":<event>} to all WebSocket clients.
  - On client connect: sends {"type":"snapshot","alive":[...]} first.
  - Heartbeat: {"type":"ping"} every 30s.
  - axum 0.7 + ws feature (already in Cargo.lock via kei-cortex).
  - Bypass: KEI_GRAPH_STREAM_BYPASS=1.

  Bound to 127.0.0.1:8201 (loopback only). Endpoints:
    GET /stream  → WebSocket upgrade
    GET /health  → "kei-graph-stream alive"

  4 unit + 1 integration test. cargo build clean.

  Installed binary: ~/.cargo/bin/kei-graph-stream
  Launchd plist: io.keisei.graph-stream (RunAtLoad, KeepAlive)
  Loaded as PID 52678, /health 200 OK verified.

LAYER 3 — live-graph.html (single-file frontend)
  ~/Projects/lbm-graph-viz/live-graph.html  (~464 LOC, self-contained)

  - SVG full-viewport, dark #0f172a, CSS grid background.
  - Pinned center node "main" (orchestrator), gold #fbbf24, glowing.
  - Agents radiate via D3 force-simulation; color-by-model
    (sonnet=green, opus=red, haiku=blue, default=gray).
  - On agent_spawn: fade-in 300ms, edge from main to new node.
  - On tool_use: pulse on agent node (r 8→12→8 over 400ms) +
    floating tool name label fades 800ms.
  - On agent_done: outcome-color flash → fade-out 800ms → remove.
  - WebSocket client: ws://127.0.0.1:8201/stream, exponential-backoff
    reconnect (1s→30s).
  - Top-right status badge: ● connected | ○ reconnecting | ✕ disconnected.
  - Bottom counters: alive / spawned / tool calls / done / last event age.
  - No build step. D3 v7 from CDN. Pure HTML+JS+CSS.

End-to-end smoke (this machine, just now):
  - daemon health 200 OK
  - hook injected agent_spawn → daemon broadcasts → AliveState=1
  - hook injected agent_done  → daemon broadcasts → AliveState=0
  - frontend file syntax-checked clean

What this does NOT do (deferred, by user direction "это онлайн"):
  - History persistence — agents who finished are GONE from the graph.
    Per-session log remains in events.jsonl + sleep-sync if user wants
    to consult later, but the live view is RIGHT NOW only.
  - Sub-agent attribution beyond "main" — orchestrator-direct tool calls
    show on the orchestrator node. Sub-agent's internal tool calls would
    need session-id correlation; current schema has agent_id="main"
    placeholder for non-Agent tool calls.
  - Replay mode — no time-scrubber. Possible follow-up if useful.
  - Auth on WebSocket — bound to 127.0.0.1 only. Local-only by design.

=== STATUS-TRUTH MARKER ===
shipped: functional
stubs: 0
cargo-check: PASS
behaviour-verified: yes
follow-up-required:
  - Sub-agent tool-call attribution (correlate session_id chain)
  - Replay mode with time scrubber (if user finds use)
  - Tool aggregator nodes ("Bash bucket" with N) instead of per-agent pulses

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 13:30:24 +08:00

104 lines
3.7 KiB
Rust

/// Integration smoke test: spins up a real kei-graph-stream server on a random port,
/// appends events to a temp JSONL file, and verifies WS snapshot + event frames.
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use serde_json::Value;
use tempfile::NamedTempFile;
use tokio::sync::broadcast;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures::StreamExt;
async fn start_server(events_path: std::path::PathBuf) -> SocketAddr {
use axum::Router;
use axum::routing::get;
let (tx, _) = broadcast::channel::<String>(256);
let tx = Arc::new(tx);
let alive = Arc::new(kei_graph_stream::AliveState::new());
tokio::spawn(kei_graph_stream::tail::run(
events_path,
Arc::clone(&tx),
Arc::clone(&alive),
));
let app = Router::new()
.route("/stream", get(kei_graph_stream::ws::ws_handler))
.route("/health", get(|| async { "kei-graph-stream alive\n" }))
.with_state((tx, alive));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
// axum::serve returns IntoFuture; use `into_future()` to spawn.
use std::future::IntoFuture;
tokio::spawn(axum::serve(listener, app).into_future());
addr
}
async fn recv_text(
stream: &mut (impl StreamExt<
Item = Result<Message, tokio_tungstenite::tungstenite::Error>,
> + Unpin),
) -> Value {
loop {
if let Message::Text(t) = stream.next().await.unwrap().unwrap() {
return serde_json::from_str(&t).unwrap();
}
}
}
#[tokio::test]
async fn smoke_snapshot_and_event() {
let mut tmp = NamedTempFile::new().unwrap();
let path = std::path::PathBuf::from(tmp.path());
let addr = start_server(path.clone()).await;
// Health check.
let body = reqwest::get(format!("http://{addr}/health"))
.await
.unwrap()
.text()
.await
.unwrap();
assert_eq!(body, "kei-graph-stream alive\n");
// Connect WS before any events — expect empty snapshot.
let (mut ws1, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap();
let snap: Value = recv_text(&mut ws1).await;
assert_eq!(snap["type"], "snapshot");
assert!(snap["alive"].as_array().unwrap().is_empty());
// Append a spawn event.
writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap();
// Allow tail poll (200ms) + margin.
tokio::time::sleep(Duration::from_millis(500)).await;
// Should receive an event frame on the existing connection.
let frame: Value = recv_text(&mut ws1).await;
assert_eq!(frame["type"], "event");
assert_eq!(frame["data"]["event"], "agent_spawn");
assert_eq!(frame["data"]["id"], "smoke1");
// New client snapshot should contain smoke1.
let (mut ws2, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap();
let snap2: Value = recv_text(&mut ws2).await;
assert_eq!(snap2["type"], "snapshot");
let alive2 = snap2["alive"].as_array().unwrap();
assert_eq!(alive2.len(), 1);
assert_eq!(alive2[0]["id"], "smoke1");
// Append done event.
writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:01.000Z","event":"agent_done","id":"smoke1","outcome":"functional","duration_ms":1000}}"#).unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
// Third client: snapshot should now be empty.
let (mut ws3, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap();
let snap3: Value = recv_text(&mut ws3).await;
assert_eq!(snap3["type"], "snapshot");
assert!(snap3["alive"].as_array().unwrap().is_empty());
}