From 52a02dfbff2c9c4768538240c3f71228dc0ebb21 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Sat, 2 May 2026 13:30:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(live-graph):=20WebSocket=20activity=20stre?= =?UTF-8?q?am=20=E2=80=94=20orchestrator-centric=20live=20view?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit User pushback: "транслирует в онлайне какие агенты создаются? основное окно агента, а дальше при запусках появляются новые ветки, мы показываем в онлайне как агенты собираются и работают" Earlier `kei-graph-export` rendered the static SUBSTRATE (all 581 atoms, catalog-style). User wanted the LIFECYCLE: orchestrator at center, every new agent as a fading-in branch, every tool call as a pulse, every completion as a fade-out. TTL = until done; pure online, no history accumulation per user direction. Three-layer architecture, all conforming to schema /tmp/agent-events-schema.md: LAYER 1 — Event emitters (4 hooks) hooks/agent-event-spawn.sh PreToolUse:Agent → agent_spawn event hooks/agent-event-done.sh PostToolUse:Agent → agent_done event (parses STATUS-TRUTH MARKER for outcome, computes cost_usd from token×pricing table) hooks/tool-use-event.sh PreToolUse:Bash|Read|Edit|Write|Grep|Glob|NotebookEdit → tool_use event hooks/skill-record.sh EXTENDED — second emit step writes skill_use event in addition to existing kei-ledger record-skill call All 4 are POSIX /bin/sh, defensive (never block, exit 0), bypass via KEI_EVENTS_BYPASS=1. Append-only JSONL to ~/.claude/memory/agent-events.jsonl. Smoke: 4 synthetic invocations cover spawn/done/tool/filter cases. LAYER 2 — kei-graph-stream Rust daemon _primitives/_rust/kei-graph-stream/ (~480 LOC, 5 files + 1 test) - Tails events.jsonl every 200ms (poll-based, no notify dep). - Parses each event, updates AliveState (insert on spawn, remove on done). - Broadcasts {"type":"event","data":} to all WebSocket clients. - On client connect: sends {"type":"snapshot","alive":[...]} first. - Heartbeat: {"type":"ping"} every 30s. - axum 0.7 + ws feature (already in Cargo.lock via kei-cortex). - Bypass: KEI_GRAPH_STREAM_BYPASS=1. Bound to 127.0.0.1:8201 (loopback only). Endpoints: GET /stream → WebSocket upgrade GET /health → "kei-graph-stream alive" 4 unit + 1 integration test. cargo build clean. Installed binary: ~/.cargo/bin/kei-graph-stream Launchd plist: io.keisei.graph-stream (RunAtLoad, KeepAlive) Loaded as PID 52678, /health 200 OK verified. LAYER 3 — live-graph.html (single-file frontend) ~/Projects/lbm-graph-viz/live-graph.html (~464 LOC, self-contained) - SVG full-viewport, dark #0f172a, CSS grid background. - Pinned center node "main" (orchestrator), gold #fbbf24, glowing. - Agents radiate via D3 force-simulation; color-by-model (sonnet=green, opus=red, haiku=blue, default=gray). - On agent_spawn: fade-in 300ms, edge from main to new node. - On tool_use: pulse on agent node (r 8→12→8 over 400ms) + floating tool name label fades 800ms. - On agent_done: outcome-color flash → fade-out 800ms → remove. - WebSocket client: ws://127.0.0.1:8201/stream, exponential-backoff reconnect (1s→30s). - Top-right status badge: ● connected | ○ reconnecting | ✕ disconnected. - Bottom counters: alive / spawned / tool calls / done / last event age. - No build step. D3 v7 from CDN. Pure HTML+JS+CSS. End-to-end smoke (this machine, just now): - daemon health 200 OK - hook injected agent_spawn → daemon broadcasts → AliveState=1 - hook injected agent_done → daemon broadcasts → AliveState=0 - frontend file syntax-checked clean What this does NOT do (deferred, by user direction "это онлайн"): - History persistence — agents who finished are GONE from the graph. Per-session log remains in events.jsonl + sleep-sync if user wants to consult later, but the live view is RIGHT NOW only. - Sub-agent attribution beyond "main" — orchestrator-direct tool calls show on the orchestrator node. Sub-agent's internal tool calls would need session-id correlation; current schema has agent_id="main" placeholder for non-Agent tool calls. - Replay mode — no time-scrubber. Possible follow-up if useful. - Auth on WebSocket — bound to 127.0.0.1 only. Local-only by design. === STATUS-TRUTH MARKER === shipped: functional stubs: 0 cargo-check: PASS behaviour-verified: yes follow-up-required: - Sub-agent tool-call attribution (correlate session_id chain) - Replay mode with time scrubber (if user finds use) - Tool aggregator nodes ("Bash bucket" with N) instead of per-agent pulses Co-Authored-By: Claude Opus 4.7 (1M context) --- _primitives/_rust/Cargo.lock | 16 +++ _primitives/_rust/Cargo.toml | 2 + _primitives/_rust/kei-graph-stream/Cargo.toml | 28 ++++ _primitives/_rust/kei-graph-stream/src/lib.rs | 5 + .../_rust/kei-graph-stream/src/main.rs | 68 +++++++++ .../_rust/kei-graph-stream/src/state.rs | 84 +++++++++++ .../_rust/kei-graph-stream/src/tail.rs | 135 ++++++++++++++++++ _primitives/_rust/kei-graph-stream/src/ws.rs | 83 +++++++++++ .../_rust/kei-graph-stream/tests/smoke.rs | 104 ++++++++++++++ .../templates/io.keisei.graph-stream.plist | 36 +++++ docs/DNA-INDEX.md | 20 ++- hooks/agent-event-done.sh | 59 ++++++++ hooks/agent-event-spawn.sh | 46 ++++++ hooks/skill-record.sh | 30 +++- hooks/tool-use-event.sh | 42 ++++++ 15 files changed, 746 insertions(+), 12 deletions(-) create mode 100644 _primitives/_rust/kei-graph-stream/Cargo.toml create mode 100644 _primitives/_rust/kei-graph-stream/src/lib.rs create mode 100644 _primitives/_rust/kei-graph-stream/src/main.rs create mode 100644 _primitives/_rust/kei-graph-stream/src/state.rs create mode 100644 _primitives/_rust/kei-graph-stream/src/tail.rs create mode 100644 _primitives/_rust/kei-graph-stream/src/ws.rs create mode 100644 _primitives/_rust/kei-graph-stream/tests/smoke.rs create mode 100644 _primitives/templates/io.keisei.graph-stream.plist create mode 100755 hooks/agent-event-done.sh create mode 100755 hooks/agent-event-spawn.sh create mode 100755 hooks/tool-use-event.sh diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 6592154..3d018c2 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -3675,6 +3675,22 @@ dependencies = [ "toml", ] +[[package]] +name = "kei-graph-stream" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "futures", + "reqwest 0.12.28", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-tungstenite 0.29.0", +] + [[package]] name = "kei-hibernate" version = "0.1.0" diff --git a/_primitives/_rust/Cargo.toml b/_primitives/_rust/Cargo.toml index 6c8afd6..5e62b09 100644 --- a/_primitives/_rust/Cargo.toml +++ b/_primitives/_rust/Cargo.toml @@ -179,6 +179,8 @@ members = [ "kei-db-contract", # Live runtime-graph exporter (registry + ledger → D3 space fragment) "kei-graph-export", + # Live agent-events.jsonl tail → WebSocket stream (kei-graph-stream daemon) + "kei-graph-stream", ] [workspace.package] diff --git a/_primitives/_rust/kei-graph-stream/Cargo.toml b/_primitives/_rust/kei-graph-stream/Cargo.toml new file mode 100644 index 0000000..875fb71 --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "kei-graph-stream" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +description = "Tail agent-events.jsonl and stream to browser clients via WebSocket" + +[lib] +name = "kei_graph_stream" +path = "src/lib.rs" + +[[bin]] +name = "kei-graph-stream" +path = "src/main.rs" + +[dependencies] +axum = { version = "0.7", features = ["ws"] } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +clap = { version = "4", features = ["derive", "env"] } + +[dev-dependencies] +tokio-tungstenite = { workspace = true } +reqwest = { workspace = true } +tempfile = { workspace = true } +futures = { workspace = true } diff --git a/_primitives/_rust/kei-graph-stream/src/lib.rs b/_primitives/_rust/kei-graph-stream/src/lib.rs new file mode 100644 index 0000000..c3b227a --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/lib.rs @@ -0,0 +1,5 @@ +pub mod state; +pub mod tail; +pub mod ws; + +pub use state::AliveState; diff --git a/_primitives/_rust/kei-graph-stream/src/main.rs b/_primitives/_rust/kei-graph-stream/src/main.rs new file mode 100644 index 0000000..84fa44a --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/main.rs @@ -0,0 +1,68 @@ +use anyhow::Result; +use axum::{Router, routing::get}; +use clap::Parser; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::broadcast; + +use kei_graph_stream::{AliveState, tail, ws}; + +#[derive(Parser, Debug)] +#[command(name = "kei-graph-stream", about = "Stream agent events to browser via WebSocket")] +struct Cli { + #[arg(long, env = "KEI_GRAPH_STREAM_BIND", default_value = "127.0.0.1:8201")] + bind: SocketAddr, + + #[arg(long, env = "KEI_EVENTS_FILE")] + events_file: Option, +} + +fn default_events_file() -> PathBuf { + let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()); + PathBuf::from(home).join(".claude/memory/agent-events.jsonl") +} + +#[tokio::main] +async fn main() -> Result<()> { + if std::env::var("KEI_GRAPH_STREAM_BYPASS").as_deref() == Ok("1") { + eprintln!("[kei-graph-stream] bypass mode — exiting"); + return Ok(()); + } + + let cli = Cli::parse(); + let events_file = cli.events_file.unwrap_or_else(default_events_file); + + if let Some(parent) = events_file.parent() { + tokio::fs::create_dir_all(parent).await?; + } + if !events_file.exists() { + tokio::fs::write(&events_file, b"").await?; + } + + let (tx, _rx) = broadcast::channel::(256); + let tx = Arc::new(tx); + let alive = Arc::new(AliveState::new()); + + tokio::spawn(tail::run(events_file, Arc::clone(&tx), Arc::clone(&alive))); + + let app = build_router(Arc::clone(&tx), Arc::clone(&alive)); + let listener = tokio::net::TcpListener::bind(cli.bind).await?; + eprintln!("[kei-graph-stream] listening on {}", cli.bind); + axum::serve(listener, app).await?; + Ok(()) +} + +fn build_router( + tx: Arc>, + alive: Arc, +) -> Router { + Router::new() + .route("/stream", get(ws::ws_handler)) + .route("/health", get(health_handler)) + .with_state((tx, alive)) +} + +async fn health_handler() -> &'static str { + "kei-graph-stream alive\n" +} diff --git a/_primitives/_rust/kei-graph-stream/src/state.rs b/_primitives/_rust/kei-graph-stream/src/state.rs new file mode 100644 index 0000000..b63a99b --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/state.rs @@ -0,0 +1,84 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Mutex; + +/// Minimal info kept per alive agent (spawned, not yet done). +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AgentInfo { + pub id: String, + pub subagent_type: String, + pub model: String, + pub ts: String, +} + +/// Thread-safe map of currently alive agents. +pub struct AliveState { + inner: Mutex>, +} + +impl AliveState { + pub fn new() -> Self { + Self { + inner: Mutex::new(HashMap::new()), + } + } + + /// Insert or update an agent from a spawn event. + pub fn insert(&self, event: &serde_json::Value) { + let Some(id) = event["id"].as_str() else { return }; + let info = AgentInfo { + id: id.to_string(), + subagent_type: event["subagent_type"] + .as_str() + .unwrap_or("unknown") + .to_string(), + model: event["model"].as_str().unwrap_or("unknown").to_string(), + ts: event["ts"].as_str().unwrap_or("").to_string(), + }; + self.inner.lock().unwrap().insert(id.to_string(), info); + } + + /// Remove an agent on done event. + pub fn remove(&self, event: &serde_json::Value) { + let Some(id) = event["id"].as_str() else { return }; + self.inner.lock().unwrap().remove(id); + } + + /// Snapshot sorted newest-first (ISO8601 lexicographic on ts). + pub fn snapshot(&self) -> Vec { + let mut agents: Vec = + self.inner.lock().unwrap().values().cloned().collect(); + agents.sort_by(|a, b| b.ts.cmp(&a.ts)); + agents + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn insert_and_snapshot() { + let s = AliveState::new(); + s.insert(&json!({"id":"a1","subagent_type":"researcher","model":"sonnet","ts":"2026-05-02T13:00:00Z"})); + s.insert(&json!({"id":"a2","subagent_type":"coder","model":"opus","ts":"2026-05-02T13:01:00Z"})); + let snap = s.snapshot(); + assert_eq!(snap.len(), 2); + assert_eq!(snap[0].id, "a2"); // newest first + } + + #[test] + fn remove_clears_agent() { + let s = AliveState::new(); + s.insert(&json!({"id":"a1","subagent_type":"x","model":"y","ts":"2026-05-02T00:00:00Z"})); + s.remove(&json!({"id":"a1"})); + assert!(s.snapshot().is_empty()); + } + + #[test] + fn snapshot_empty_initial() { + let s = AliveState::new(); + assert!(s.snapshot().is_empty()); + } +} diff --git a/_primitives/_rust/kei-graph-stream/src/tail.rs b/_primitives/_rust/kei-graph-stream/src/tail.rs new file mode 100644 index 0000000..5f0cc7a --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/tail.rs @@ -0,0 +1,135 @@ +use anyhow::Result; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader}; +use tokio::sync::broadcast; +use tokio::time::{Duration, sleep}; + +use crate::state::AliveState; + +const POLL_INTERVAL: Duration = Duration::from_millis(200); + +/// Continuously tail `path`, parse events, update alive state, broadcast. +pub async fn run( + path: PathBuf, + tx: Arc>, + alive: Arc, +) -> Result<()> { + let mut file = tokio::fs::File::open(&path).await?; + // Seek to end — no history replay. + let initial_len = file.seek(tokio::io::SeekFrom::End(0)).await?; + let mut cursor = initial_len; + + loop { + sleep(POLL_INTERVAL).await; + + let meta = match tokio::fs::metadata(&path).await { + Ok(m) => m, + Err(_) => continue, + }; + let current_len = meta.len(); + + if current_len < cursor { + // File was rotated/truncated — reopen and reset. + file = tokio::fs::File::open(&path).await?; + cursor = 0; + } + + if current_len == cursor { + continue; + } + + // Read new bytes from cursor. + file.seek(tokio::io::SeekFrom::Start(cursor)).await?; + let mut reader = BufReader::new(&mut file); + let mut lines_read: u64 = 0; + + let mut line = String::new(); + loop { + line.clear(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + break; + } + lines_read += n as u64; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + process_line(trimmed, &tx, &alive); + } + + cursor += lines_read; + } +} + +fn process_line( + line: &str, + tx: &broadcast::Sender, + alive: &AliveState, +) { + let Ok(event) = serde_json::from_str::(line) else { + return; + }; + + match event["event"].as_str() { + Some("agent_spawn") => alive.insert(&event), + Some("agent_done") => alive.remove(&event), + _ => {} + } + + let frame = match serde_json::to_string(&serde_json::json!({ + "type": "event", + "data": &event, + })) { + Ok(s) => s, + Err(_) => return, + }; + + // Ignore send errors (no subscribers yet is fine). + let _ = tx.send(frame); +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::NamedTempFile; + use std::io::Write; + + #[tokio::test] + async fn tail_detects_new_lines() { + let mut tmp = NamedTempFile::new().unwrap(); + let path = PathBuf::from(tmp.path()); + + let (tx, mut rx) = broadcast::channel::(16); + let tx = Arc::new(tx); + let alive = Arc::new(AliveState::new()); + + // Spawn tail task (will seek to EOF of empty file → cursor=0). + let path2 = path.clone(); + let tx2 = Arc::clone(&tx); + let alive2 = Arc::clone(&alive); + tokio::spawn(async move { run(path2, tx2, alive2).await }); + + // Wait for first poll cycle. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Append a spawn event. + let ev = json!({"ts":"2026-05-02T13:00:00Z","event":"agent_spawn","id":"t1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}); + writeln!(tmp, "{}", ev.to_string()).unwrap(); + + // Allow poll to pick it up. + tokio::time::sleep(Duration::from_millis(400)).await; + + let msg = rx.recv().await.unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap(); + assert_eq!(parsed["type"], "event"); + assert_eq!(parsed["data"]["event"], "agent_spawn"); + + // Alive state should contain t1. + let snap = alive.snapshot(); + assert_eq!(snap.len(), 1); + assert_eq!(snap[0].id, "t1"); + } +} diff --git a/_primitives/_rust/kei-graph-stream/src/ws.rs b/_primitives/_rust/kei-graph-stream/src/ws.rs new file mode 100644 index 0000000..58a15d4 --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/src/ws.rs @@ -0,0 +1,83 @@ +use axum::{ + extract::{ + State, + ws::{Message, WebSocket, WebSocketUpgrade}, + }, + response::Response, +}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::time::{Duration, interval}; + +use crate::state::AliveState; + +pub type AppState = (Arc>, Arc); + +/// Axum extractor handler: upgrade HTTP → WebSocket. +pub async fn ws_handler( + ws: WebSocketUpgrade, + State((tx, alive)): State, +) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, tx, alive)) +} + +async fn handle_socket( + mut socket: WebSocket, + tx: Arc>, + alive: Arc, +) { + // 1. Send snapshot of currently alive agents. + let snapshot = build_snapshot(&alive); + if socket.send(Message::Text(snapshot)).await.is_err() { + return; + } + + // 2. Subscribe to broadcast AFTER snapshot to avoid missing events. + let mut rx = tx.subscribe(); + let mut heartbeat = interval(Duration::from_secs(30)); + heartbeat.tick().await; // consume the immediate first tick + + loop { + tokio::select! { + // Broadcast event → forward to client. + result = rx.recv() => { + match result { + Ok(msg) => { + if socket.send(Message::Text(msg)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + eprintln!("[ws] client lagged {n} messages"); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + + // Heartbeat ping every 30s. + _ = heartbeat.tick() => { + let ping = r#"{"type":"ping"}"#.to_string(); + if socket.send(Message::Text(ping)).await.is_err() { + break; + } + } + + // Client message (pong or close). + msg = socket.recv() => { + match msg { + Some(Ok(Message::Close(_))) | None => break, + _ => {} // ignore other client frames + } + } + } + } +} + +fn build_snapshot(alive: &AliveState) -> String { + let agents = alive.snapshot(); + serde_json::json!({ + "type": "snapshot", + "alive": agents, + }) + .to_string() +} diff --git a/_primitives/_rust/kei-graph-stream/tests/smoke.rs b/_primitives/_rust/kei-graph-stream/tests/smoke.rs new file mode 100644 index 0000000..ffbe130 --- /dev/null +++ b/_primitives/_rust/kei-graph-stream/tests/smoke.rs @@ -0,0 +1,104 @@ +/// Integration smoke test: spins up a real kei-graph-stream server on a random port, +/// appends events to a temp JSONL file, and verifies WS snapshot + event frames. +use std::io::Write; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::Value; +use tempfile::NamedTempFile; +use tokio::sync::broadcast; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use futures::StreamExt; + +async fn start_server(events_path: std::path::PathBuf) -> SocketAddr { + use axum::Router; + use axum::routing::get; + + let (tx, _) = broadcast::channel::(256); + let tx = Arc::new(tx); + let alive = Arc::new(kei_graph_stream::AliveState::new()); + + tokio::spawn(kei_graph_stream::tail::run( + events_path, + Arc::clone(&tx), + Arc::clone(&alive), + )); + + let app = Router::new() + .route("/stream", get(kei_graph_stream::ws::ws_handler)) + .route("/health", get(|| async { "kei-graph-stream alive\n" })) + .with_state((tx, alive)); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + // axum::serve returns IntoFuture; use `into_future()` to spawn. + use std::future::IntoFuture; + tokio::spawn(axum::serve(listener, app).into_future()); + addr +} + +async fn recv_text( + stream: &mut (impl StreamExt< + Item = Result, + > + Unpin), +) -> Value { + loop { + if let Message::Text(t) = stream.next().await.unwrap().unwrap() { + return serde_json::from_str(&t).unwrap(); + } + } +} + +#[tokio::test] +async fn smoke_snapshot_and_event() { + let mut tmp = NamedTempFile::new().unwrap(); + let path = std::path::PathBuf::from(tmp.path()); + + let addr = start_server(path.clone()).await; + + // Health check. + let body = reqwest::get(format!("http://{addr}/health")) + .await + .unwrap() + .text() + .await + .unwrap(); + assert_eq!(body, "kei-graph-stream alive\n"); + + // Connect WS before any events — expect empty snapshot. + let (mut ws1, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let snap: Value = recv_text(&mut ws1).await; + assert_eq!(snap["type"], "snapshot"); + assert!(snap["alive"].as_array().unwrap().is_empty()); + + // Append a spawn event. + writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:00.000Z","event":"agent_spawn","id":"smoke1","subagent_type":"researcher","model":"sonnet","prompt_preview":"test"}}"#).unwrap(); + + // Allow tail poll (200ms) + margin. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Should receive an event frame on the existing connection. + let frame: Value = recv_text(&mut ws1).await; + assert_eq!(frame["type"], "event"); + assert_eq!(frame["data"]["event"], "agent_spawn"); + assert_eq!(frame["data"]["id"], "smoke1"); + + // New client snapshot should contain smoke1. + let (mut ws2, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let snap2: Value = recv_text(&mut ws2).await; + assert_eq!(snap2["type"], "snapshot"); + let alive2 = snap2["alive"].as_array().unwrap(); + assert_eq!(alive2.len(), 1); + assert_eq!(alive2[0]["id"], "smoke1"); + + // Append done event. + writeln!(tmp, r#"{{"ts":"2026-05-02T13:00:01.000Z","event":"agent_done","id":"smoke1","outcome":"functional","duration_ms":1000}}"#).unwrap(); + tokio::time::sleep(Duration::from_millis(500)).await; + + // Third client: snapshot should now be empty. + let (mut ws3, _) = connect_async(format!("ws://{addr}/stream")).await.unwrap(); + let snap3: Value = recv_text(&mut ws3).await; + assert_eq!(snap3["type"], "snapshot"); + assert!(snap3["alive"].as_array().unwrap().is_empty()); +} diff --git a/_primitives/templates/io.keisei.graph-stream.plist b/_primitives/templates/io.keisei.graph-stream.plist new file mode 100644 index 0000000..0f2c692 --- /dev/null +++ b/_primitives/templates/io.keisei.graph-stream.plist @@ -0,0 +1,36 @@ + + + + + Label + io.keisei.graph-stream + + ProgramArguments + + HOME_DIR/.cargo/bin/kei-graph-stream + --bind + 127.0.0.1:8201 + + + RunAtLoad + + + KeepAlive + + + StandardOutPath + HOME_DIR/.claude/memory/graph-stream.log + + StandardErrorPath + HOME_DIR/.claude/memory/graph-stream.log + + EnvironmentVariables + + PATH + /usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:HOME_DIR/.cargo/bin + HOME + HOME_DIR + + + diff --git a/docs/DNA-INDEX.md b/docs/DNA-INDEX.md index bd42198..94f095f 100644 --- a/docs/DNA-INDEX.md +++ b/docs/DNA-INDEX.md @@ -1,19 +1,19 @@ # KeiSeiKit DNA Encyclopedia -> Auto-generated from kei-registry. Last regenerated: 2026-05-02T05:07:21Z. -> Total blocks: 518. Per-type breakdown: +> Auto-generated from kei-registry. Last regenerated: 2026-05-02T05:30:23Z. +> Total blocks: 522. Per-type breakdown: | Type | Count | |---|---:| | atom | 121 | -| hook | 45 | -| primitive | 110 | +| hook | 48 | +| primitive | 111 | | rule | 174 | | skill | 68 | --- -## Primitive (110) +## Primitive (111) Sorted alphabetically by name. @@ -64,6 +64,7 @@ Sorted alphabetically by name. | kei-git-gitlab | primitive::md,networ… | _primitives/_rust/kei-git-gitlab/Cargo.toml | 59a5271b | | kei-graph-check | primitive::cli,fs,md… | _primitives/_rust/kei-graph-check/Cargo.toml | 2c0e38d8 | | kei-graph-export | primitive::cli,md,sq… | _primitives/_rust/kei-graph-export/Cargo.toml | de93b403 | +| kei-graph-stream | primitive::cli,md,ne… | _primitives/_rust/kei-graph-stream/Cargo.toml | 04ef818f | | kei-hibernate | primitive::cli,hash,… | _primitives/_rust/kei-hibernate/Cargo.toml | 1ea136f5 | | kei-import-project | primitive::cli,fs,ha… | _primitives/_rust/kei-import-project/Cargo.toml | 2de0fd64 | | kei-leak-matrix | primitive::cli,fs,md… | _primitives/_rust/kei-leak-matrix/Cargo.toml | a3803ef9 | @@ -839,7 +840,7 @@ Sorted alphabetically by name. | sleep-layer::the-rule | rule::_::576bbb7f::d… | d0e03a0d | -## Hook (45) +## Hook (48) Sorted alphabetically by name. @@ -848,6 +849,8 @@ Sorted alphabetically by name. | affect-live-scan | shell | hook::shell::b7f9b36… | hooks/affect-live-scan.sh | | agent-capability-check | shell | hook::shell::eab55b0… | hooks/agent-capability-check.sh | | agent-capability-verify | shell | hook::shell::86c19ba… | hooks/agent-capability-verify.sh | +| agent-event-done | shell | hook::shell::a05c64f… | hooks/agent-event-done.sh | +| agent-event-spawn | shell | hook::shell::7137192… | hooks/agent-event-spawn.sh | | agent-fork-done | shell | hook::shell::eeaa011… | hooks/agent-fork-done.sh | | agent-fork-logger | shell | hook::shell::1b43957… | hooks/agent-fork-logger.sh | | agent-heartbeat-tick | shell | hook::shell::29d6dbe… | hooks/agent-heartbeat-tick.sh | @@ -890,6 +893,7 @@ Sorted alphabetically by name. | stop-verify | shell | hook::shell::adedcfe… | hooks/stop-verify.sh | | task-timer | shell | hook::shell::dda5e94… | hooks/task-timer.sh | | tomd-preread | shell | hook::shell::8a95b76… | hooks/tomd-preread.sh | +| tool-use-event | shell | hook::shell::34bb788… | hooks/tool-use-event.sh | ## Atom (121) @@ -1032,6 +1036,8 @@ Sorted alphabetically by name. - `New Agent — Project-Specialist Wizard` — 2 versions: dfdaea5c → bcf5a0d9 - `STACK — Python ML (PyTorch / JAX)` — 2 versions: ceb1fc98 → 4afd934a - `Self-Audit — Session Retrospective Triage (index)` — 2 versions: 339cb507 → 38fd80b7 +- `agent-event-done` — 2 versions: ef70393c → 598bc917 +- `agent-event-spawn` — 2 versions: b4573a30 → fb3603c7 - `agent-heartbeat-tick` — 2 versions: 5eb00dc3 → 560fa0f8 - `agent-outcome-backfill` — 2 versions: 0e00d9ca → c901aaf2 - `alignment-check` — 2 versions: 4e7389b1 → b1e18549 @@ -1083,6 +1089,7 @@ Sorted alphabetically by name. - `kei-git-gitlab` — 2 versions: 744859c4 → 59a5271b - `kei-graph-check` — 2 versions: e08f240e → 2c0e38d8 - `kei-graph-export::kei-graph-export` — 26 versions: 2e9d962a → b0f840b1 → 4a42d5f4 → a9d35468 → 1f0c066f → 6f5cd1a9 → 89ae1693 → fbebe21d → 63b761f6 → 643d3f08 → 7ba05286 → ca606a00 → c1f97c41 → 237d050b → 094ddc72 → 006b0f7d → c3d7c243 → a67fc02f → 33beda01 → 615a6cfb → 6dbfd254 → bb6ca1bb → 48cb9c62 → 5529822c → 1b597838 → f17c1aeb +- `kei-graph-stream::kei-graph-stream` — 8 versions: 2e9d962a → d3087d32 → eefe8fc1 → 021bb6f8 → 96a32fa0 → 9ca6470e → d527efb1 → 28e6d9b6 - `kei-hibernate` — 2 versions: 25f6d5bc → 1ea136f5 - `kei-import-project` — 2 versions: aa3750a0 → 2de0fd64 - `kei-leak-matrix` — 2 versions: 06a89af2 → a3803ef9 @@ -1154,6 +1161,7 @@ Sorted alphabetically by name. - `post-write-check` — 2 versions: 6ceb2237 → 4aaf1c5e - `safety-guard` — 2 versions: 32b889cf → 665e7cd1 - `site-wysiwyd-check` — 2 versions: a0d38a22 → 416c0648 +- `skill-record` — 3 versions: cdf67741 → e2444805 → 44e464fe - `sleep-report-tg` — 3 versions: acc3ebfb → ef101ab6 → 9529ec50 - `ssh-check` — 2 versions: f419e2b0 → ebd97541 - `task-timer` — 2 versions: 202823f9 → 16e4f0a3 diff --git a/hooks/agent-event-done.sh b/hooks/agent-event-done.sh new file mode 100755 index 0000000..42c51a2 --- /dev/null +++ b/hooks/agent-event-done.sh @@ -0,0 +1,59 @@ +#!/bin/sh +# agent-event-done.sh — PostToolUse:Agent hook. +# Emits `agent_done` event to ~/.claude/memory/agent-events.jsonl +# per the locked schema at /tmp/agent-events-schema.md (2026-05-02). +# Reuses STATUS-TRUTH MARKER parsing from agent-outcome-backfill.sh. +# Defensive: never blocks, exits 0. Bypass: KEI_EVENTS_BYPASS=1. +set -u + +[ "${KEI_EVENTS_BYPASS:-0}" = "1" ] && exit 0 +command -v jq >/dev/null 2>&1 || exit 0 + +PAYLOAD=$(cat 2>/dev/null || true) +[ -n "$PAYLOAD" ] || exit 0 + +TOOL=$(printf '%s' "$PAYLOAD" | jq -r '.tool_name // empty' 2>/dev/null) +[ "$TOOL" = "Agent" ] || exit 0 + +EVENTS_FILE="$HOME/.claude/memory/agent-events.jsonl" +mkdir -p "$(dirname "$EVENTS_FILE")" 2>/dev/null || true + +TOOL_USE_ID=$(printf '%s' "$PAYLOAD" | jq -r '.tool_use_id // .toolUseId // "unknown"' 2>/dev/null) + +# Flatten tool_response content to plain text (pattern from agent-outcome-backfill.sh). +RESPONSE=$(printf '%s' "$PAYLOAD" | jq -r ' + (.tool_response // "") as $r | def f: + if type=="string" then . elif type=="array" then map(f)|join("\n") + elif type=="object" then (if has("text") then .text elif has("content") then .content|f else tostring end) + else "" end; $r|f' 2>/dev/null || true) + +# Parse outcome from STATUS-TRUTH MARKER; null if absent or unrecognized. +OUTCOME="null" +if printf '%s' "$RESPONSE" | grep -q '=== STATUS-TRUTH MARKER ===' 2>/dev/null; then + SHIPPED=$(printf '%s' "$RESPONSE" | grep -m1 '^shipped:' \ + | sed 's/^shipped:[[:space:]]*//' | awk '{print tolower($1)}' 2>/dev/null || true) + case "$SHIPPED" in functional|partial|scaffolding|fail) OUTCOME="\"$SHIPPED\"";; esac +fi + +# Cost estimate from token counts × rough per-token price constants. +MODEL=$(printf '%s' "$PAYLOAD" | jq -r '.tool_response.model // .tool_input.model // ""' 2>/dev/null | tr '[:upper:]' '[:lower:]') +IN_TOK=$(printf '%s' "$PAYLOAD" | jq -r '.tool_response.usage.input_tokens // 0' 2>/dev/null) +OUT_TOK=$(printf '%s' "$PAYLOAD" | jq -r '.tool_response.usage.output_tokens // 0' 2>/dev/null) +cost_usd=$(awk -v m="$MODEL" -v i="$IN_TOK" -v o="$OUT_TOK" 'BEGIN{ + if(index(m,"haiku")>0){p=0.000001;q=0.000005} + else if(index(m,"sonnet")>0){p=0.000003;q=0.000015} + else if(index(m,"opus")>0){p=0.000005;q=0.000025} + else{print "null";exit}; printf "%.6f",i*p+o*q}' 2>/dev/null || echo "null") + +jq -cn \ + --arg ts "$(date -u +%Y-%m-%dT%H:%M:%S.000Z 2>/dev/null)" \ + --arg id "$TOOL_USE_ID" \ + --argjson outcome "$OUTCOME" \ + --argjson duration_ms "$(printf '%s' "$PAYLOAD" | jq '.duration_ms // .tool_response.totalDurationMs // null' 2>/dev/null)" \ + --argjson tool_use_count "$(printf '%s' "$PAYLOAD" | jq '.tool_response.totalToolUseCount // null' 2>/dev/null)" \ + --argjson cost_usd "$cost_usd" \ + '{ts:$ts,event:"agent_done",id:$id,outcome:$outcome, + duration_ms:$duration_ms,tool_use_count:$tool_use_count,cost_usd:$cost_usd}' \ + >> "$EVENTS_FILE" 2>/dev/null || true + +exit 0 diff --git a/hooks/agent-event-spawn.sh b/hooks/agent-event-spawn.sh new file mode 100755 index 0000000..145caa7 --- /dev/null +++ b/hooks/agent-event-spawn.sh @@ -0,0 +1,46 @@ +#!/bin/sh +# agent-event-spawn.sh — PreToolUse:Agent hook. +# +# Emits `agent_spawn` event to ~/.claude/memory/agent-events.jsonl +# per the locked schema at /tmp/agent-events-schema.md (2026-05-02). +# +# Defensive: never blocks, exits 0 on every path. +# Bypass via `KEI_EVENTS_BYPASS=1`. +set -u + +[ "${KEI_EVENTS_BYPASS:-0}" = "1" ] && exit 0 +command -v jq >/dev/null 2>&1 || exit 0 + +PAYLOAD=$(cat 2>/dev/null || true) +[ -n "$PAYLOAD" ] || exit 0 + +# Self-filter: this hook may be chained for ANY PreToolUse event. +TOOL=$(printf '%s' "$PAYLOAD" | jq -r '.tool_name // empty' 2>/dev/null) +[ "$TOOL" = "Agent" ] || exit 0 + +EVENTS_FILE="$HOME/.claude/memory/agent-events.jsonl" +mkdir -p "$(dirname "$EVENTS_FILE")" 2>/dev/null || true + +TS=$(date -u +%Y-%m-%dT%H:%M:%S.000Z 2>/dev/null) + +# Build event in a single jq pass from the raw payload. +# All nullable fields use jq // null so schema types are correct. +printf '%s' "$PAYLOAD" | jq -c \ + --arg ts "$TS" \ + '{ + ts: $ts, + event: "agent_spawn", + id: (.tool_use_id // .toolUseId // "unknown"), + parent_id: (.session_id // null), + subagent_type: (.tool_input.subagent_type // null), + model: (.tool_input.model // null), + branch: (.tool_input.isolation // null), + prompt_preview: ( + (.tool_input.prompt // "") + | gsub("[\"\\n\\r\\t]"; " ") + | .[0:80] + ) + }' \ + >> "$EVENTS_FILE" 2>/dev/null || true + +exit 0 diff --git a/hooks/skill-record.sh b/hooks/skill-record.sh index 89ae500..f76e5d7 100755 --- a/hooks/skill-record.sh +++ b/hooks/skill-record.sh @@ -1,12 +1,12 @@ #!/bin/sh # skill-record.sh — PostToolUse:Skill hook. # Records every skill invocation to kei-ledger for Phase D nightly metrics. +# Also emits skill_use event to agent-events.jsonl (schema 2026-05-02). # Defensive: never blocks, exits 0 on every path. set -u [ "${SKILL_RECORD_BYPASS:-0}" = "1" ] && exit 0 command -v jq >/dev/null 2>&1 || exit 0 -command -v kei-ledger >/dev/null 2>&1 || exit 0 PAYLOAD=$(cat 2>/dev/null || true) [ -n "$PAYLOAD" ] || exit 0 @@ -40,11 +40,29 @@ DURATION=$(printf '%s' "$PAYLOAD" | jq -r ' AGENT_ID=$(printf '%s' "$PAYLOAD" | jq -r '.tool_use_id // empty' 2>/dev/null) -ARGS="$SKILL --success $SUCCESS" -[ -n "$AGENT_ID" ] && ARGS="$ARGS --agent-id $AGENT_ID" -[ -n "$DURATION" ] && ARGS="$ARGS --duration-ms $DURATION" +# kei-ledger record (optional — skip gracefully if not installed). +if command -v kei-ledger >/dev/null 2>&1; then + ARGS="$SKILL --success $SUCCESS" + [ -n "$AGENT_ID" ] && ARGS="$ARGS --agent-id $AGENT_ID" + [ -n "$DURATION" ] && ARGS="$ARGS --duration-ms $DURATION" + # shellcheck disable=SC2086 + kei-ledger record-skill $ARGS >/dev/null 2>&1 || true +fi -# shellcheck disable=SC2086 -kei-ledger record-skill $ARGS >/dev/null 2>&1 || true +# Emit skill_use event to agent-events.jsonl (schema 2026-05-02). +if [ "${KEI_EVENTS_BYPASS:-0}" != "1" ]; then + EVENTS_FILE="$HOME/.claude/memory/agent-events.jsonl" + mkdir -p "$(dirname "$EVENTS_FILE")" 2>/dev/null || true + SESSION_ID_SKILL=$(printf '%s' "$PAYLOAD" | jq -r '.session_id // "main"' 2>/dev/null) + [ -z "$SESSION_ID_SKILL" ] && SESSION_ID_SKILL="main" + jq -cn \ + --arg ts "$(date -u +%Y-%m-%dT%H:%M:%S.000Z 2>/dev/null)" \ + --arg id "${AGENT_ID:-unknown}" \ + --arg agent_id "$SESSION_ID_SKILL" \ + --arg skill "$SKILL" \ + --argjson success "${SUCCESS:-0}" \ + '{ts:$ts,event:"skill_use",id:$id,agent_id:$agent_id,skill:$skill,success:($success==1)}' \ + >> "$EVENTS_FILE" 2>/dev/null || true +fi exit 0 diff --git a/hooks/tool-use-event.sh b/hooks/tool-use-event.sh new file mode 100755 index 0000000..d47112f --- /dev/null +++ b/hooks/tool-use-event.sh @@ -0,0 +1,42 @@ +#!/bin/sh +# tool-use-event.sh — PreToolUse hook for Bash/Read/Edit/Write/Grep/Glob/NotebookEdit. +# +# Emits `tool_use` event to ~/.claude/memory/agent-events.jsonl +# per the locked schema at /tmp/agent-events-schema.md (2026-05-02). +# +# Agent tools (spawns) are intentionally excluded — handled by agent-event-spawn.sh. +# Defensive: never blocks, exits 0 on every path. +# Bypass via `KEI_EVENTS_BYPASS=1`. +set -u + +[ "${KEI_EVENTS_BYPASS:-0}" = "1" ] && exit 0 +command -v jq >/dev/null 2>&1 || exit 0 + +PAYLOAD=$(cat 2>/dev/null || true) +[ -n "$PAYLOAD" ] || exit 0 + +# Self-filter: only emit for the tracked tool set. +TOOL=$(printf '%s' "$PAYLOAD" | jq -r '.tool_name // empty' 2>/dev/null) +case "$TOOL" in + Bash|Read|Edit|Write|Grep|Glob|NotebookEdit) ;; + *) exit 0 ;; +esac + +EVENTS_FILE="$HOME/.claude/memory/agent-events.jsonl" +mkdir -p "$(dirname "$EVENTS_FILE")" 2>/dev/null || true + +TOOL_USE_ID=$(printf '%s' "$PAYLOAD" | jq -r '.tool_use_id // .toolUseId // "unknown"' 2>/dev/null) + +# Parent agent id: use session_id if present, otherwise "main". +AGENT_ID=$(printf '%s' "$PAYLOAD" | jq -r '.session_id // "main"' 2>/dev/null) +[ -z "$AGENT_ID" ] && AGENT_ID="main" + +jq -cn \ + --arg ts "$(date -u +%Y-%m-%dT%H:%M:%S.000Z 2>/dev/null)" \ + --arg id "$TOOL_USE_ID" \ + --arg agent_id "$AGENT_ID" \ + --arg tool "$TOOL" \ + '{ts:$ts,event:"tool_use",id:$id,agent_id:$agent_id,tool:$tool}' \ + >> "$EVENTS_FILE" 2>/dev/null || true + +exit 0