From c1247fef007bd1697fb74d65425a583c9a4b43d6 Mon Sep 17 00:00:00 2001 From: Parfii-bot Date: Tue, 12 May 2026 15:51:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(kei-buddy):=20wire=20kei-chat-store=20?= =?UTF-8?q?=E2=80=94=20log=20every=20user/bot=20message=20with=20FTS5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After-Ready conversation was going to /dev/null. With this change every inbound Telegram text + every bot response is persisted to a SQLite + FTS5 archive via the existing kei-chat-store primitive (no new crate). Each Telegram chat_id maps 1:1 to a kei-chat-store session (project="kei-buddy", title="tg-", model="telegram"). Cache prevents per-message session lookups. New file: * src/chat_log.rs (198 LOC) — ChatLog adapter wrapping kei_chat_store::Store + a chat_id→session_id Mutex cache. API: from_path / from_memory / ensure_session / log_user / log_bot / search(query, chat_id?, limit). Errors map to BuddyError::Memory and never propagate from on_event — chat-log failure is logged but does not block the conversation. Modified: * Cargo.toml — kei-chat-store path dep added. * src/lib.rs — pub mod chat_log + re-export ChatLog. * src/serve.rs — BuddyContext gains Arc; process_text calls log_user before handle_step + log_bot after send_message; ServeConfig gains chat_log_db_path. * src/bin/kei-buddy.rs — KEI_BUDDY_CHAT_LOG_PATH env (default ./kei-buddy-chat.db); migrate subcommand applies the chat-store schema alongside buddy_state schema. Tests (3 new in src/chat_log.rs, all pass): * log_user_creates_session_and_message * log_bot_uses_same_session_as_log_user * different_chats_get_different_sessions Verify-before-commit: * cargo check -p kei-buddy (default): PASS * cargo check -p kei-buddy --features extractor-openai: PASS * cargo test -p kei-buddy --lib: 23 passed / 0 failed (was 20 before this commit; 3 new ChatLog tests) NOT deployed — user is in active conversation with the live bot. Will roll forward when user signals readiness. --- _primitives/_rust/kei-buddy/Cargo.toml | 2 + .../_rust/kei-buddy/src/bin/kei-buddy.rs | 9 +- _primitives/_rust/kei-buddy/src/chat_log.rs | 198 ++++++++++++++++++ _primitives/_rust/kei-buddy/src/lib.rs | 2 + _primitives/_rust/kei-buddy/src/serve.rs | 26 ++- 5 files changed, 228 insertions(+), 9 deletions(-) create mode 100644 _primitives/_rust/kei-buddy/src/chat_log.rs diff --git a/_primitives/_rust/kei-buddy/Cargo.toml b/_primitives/_rust/kei-buddy/Cargo.toml index c0a8a7e..8434ced 100644 --- a/_primitives/_rust/kei-buddy/Cargo.toml +++ b/_primitives/_rust/kei-buddy/Cargo.toml @@ -27,6 +27,8 @@ rusqlite = { workspace = true } reqwest = { workspace = true } anyhow = { workspace = true } kei-memory-sqlite = { path = "../kei-memory-sqlite" } +kei-chat-store = { path = "../kei-chat-store" } +chrono = { workspace = true } # serve feature deps axum = { version = "0.7", features = ["json", "http1", "tokio"], optional = true } diff --git a/_primitives/_rust/kei-buddy/src/bin/kei-buddy.rs b/_primitives/_rust/kei-buddy/src/bin/kei-buddy.rs index b97cb54..5127bbe 100644 --- a/_primitives/_rust/kei-buddy/src/bin/kei-buddy.rs +++ b/_primitives/_rust/kei-buddy/src/bin/kei-buddy.rs @@ -56,6 +56,7 @@ async fn cmd_serve() -> anyhow::Result<()> { .ok() .or_else(|| std::env::var("OPENAI_API_KEY").ok()), llm_model: std::env::var("KEI_BUDDY_LLM_MODEL").ok(), + chat_log_db_path: chat_log_path_from_env(), }; run_serve(cfg).await } @@ -84,8 +85,10 @@ async fn cmd_serve() -> anyhow::Result<()> { fn cmd_migrate() -> anyhow::Result<()> { let path = db_path_from_env(); let _store = kei_buddy::store::SqliteBuddyStore::from_path(&path)?; + let chat_log_path = chat_log_path_from_env(); + let _ = kei_buddy::ChatLog::from_path(&chat_log_path)?; init_log(); - tracing::info!(path = %path, "schema applied"); + tracing::info!(path = %path, chat_log_path = %chat_log_path, "schema applied"); Ok(()) } @@ -142,3 +145,7 @@ fn port_from_env() -> u16 { fn db_path_from_env() -> String { std::env::var("KEI_BUDDY_DB_PATH").unwrap_or_else(|_| "./kei-buddy.db".into()) } + +fn chat_log_path_from_env() -> String { + std::env::var("KEI_BUDDY_CHAT_LOG_PATH").unwrap_or_else(|_| "./kei-buddy-chat.db".into()) +} diff --git a/_primitives/_rust/kei-buddy/src/chat_log.rs b/_primitives/_rust/kei-buddy/src/chat_log.rs new file mode 100644 index 0000000..7baf6b3 --- /dev/null +++ b/_primitives/_rust/kei-buddy/src/chat_log.rs @@ -0,0 +1,198 @@ +// SPDX-License-Identifier: Apache-2.0 +//! `ChatLog` — maps Telegram chat_id to kei-chat-store sessions. +//! +//! Constructor Pattern: one responsibility — bridge between Telegram chat_id +//! (i64) and kei-chat-store session (String UUID). All rusqlite calls are +//! dispatched via `tokio::task::spawn_blocking`. +//! +//! `rusqlite::Connection` is not `Sync`, so `Store` is not `Sync`. We wrap +//! `Store` in `Mutex` to obtain `Send + Sync` on `Arc>`. +//! Each blocking task locks the mutex for the duration of the DB call. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use chrono::Utc; +use kei_chat_store::{ + search::search as cs_search, + sessions::{save_message, start_session, ChatMessage}, + Store, +}; +use rusqlite::OptionalExtension; + +use crate::error::BuddyError; + +/// Thin wrapper over `kei-chat-store` keyed by Telegram chat_id. +pub struct ChatLog { + store: Arc>, + /// chat_id → session_id; populated lazily. + sessions: Mutex>, +} + +impl ChatLog { + /// Open (or create) a file-backed store at `path`. + pub fn from_path(path: impl AsRef) -> Result { + let store = Store::open(path.as_ref()).map_err(|e| BuddyError::Memory(e.to_string()))?; + Ok(Self { + store: Arc::new(Mutex::new(store)), + sessions: Mutex::new(HashMap::new()), + }) + } + + /// Open an in-memory store (for tests). + pub fn from_memory() -> Result { + let store = Store::open_memory().map_err(|e| BuddyError::Memory(e.to_string()))?; + Ok(Self { + store: Arc::new(Mutex::new(store)), + sessions: Mutex::new(HashMap::new()), + }) + } + + /// Return the session_id for `chat_id`, creating one if absent. + pub async fn ensure_session(&self, chat_id: i64) -> Result { + // Fast path: session already cached. + { + let guard = self.sessions.lock().expect("sessions mutex poisoned"); + if let Some(id) = guard.get(&chat_id) { + return Ok(id.clone()); + } + } + // Slow path: query or create in blocking thread. + let store = Arc::clone(&self.store); + let title = format!("tg-{chat_id}"); + let session_id = tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + find_or_create_session(&locked, &title) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + .map_err(|e| BuddyError::Memory(e.to_string()))?; + + let mut guard = self.sessions.lock().expect("sessions mutex poisoned"); + guard.insert(chat_id, session_id.clone()); + Ok(session_id) + } + + /// Persist a user-side message. + pub async fn log_user(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> { + self.log_role(chat_id, "user", content).await + } + + /// Persist a bot-side response. + pub async fn log_bot(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> { + self.log_role(chat_id, "assistant", content).await + } + + /// Full-text search; optionally filter by chat_id. + pub async fn search( + &self, + query: &str, + chat_id: Option, + limit: i64, + ) -> Result, BuddyError> { + let filter_session = match chat_id { + Some(cid) => Some(self.ensure_session(cid).await?), + None => None, + }; + let store = Arc::clone(&self.store); + let q = query.to_string(); + let msgs = tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + cs_search(&locked, &q, limit).map_err(|e| BuddyError::Memory(e.to_string())) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + .map_err(|e: BuddyError| e)?; + + match filter_session { + None => Ok(msgs), + Some(sid) => Ok(msgs.into_iter().filter(|m| m.session_id == sid).collect()), + } + } + + // ── Private helpers ───────────────────────────────────────────────────── + + async fn log_role( + &self, + chat_id: i64, + role: &str, + content: &str, + ) -> Result<(), BuddyError> { + let session_id = self.ensure_session(chat_id).await?; + let msg = ChatMessage { + id: 0, + session_id, + role: role.to_string(), + content: content.to_string(), + tokens_in: 0, + tokens_out: 0, + cost: 0.0, + created_at: Utc::now().timestamp(), + }; + let store = Arc::clone(&self.store); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + save_message(&locked, &msg).map_err(|e| BuddyError::Memory(e.to_string())) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + .map(|_| ()) + } +} + +/// Query the DB for an existing session; create if absent. +fn find_or_create_session(store: &Store, title: &str) -> anyhow::Result { + let existing: Option = store + .conn() + .query_row( + "SELECT id FROM chat_sessions WHERE title = ?1 LIMIT 1", + rusqlite::params![title], + |row| row.get(0), + ) + .optional()?; + + if let Some(id) = existing { + return Ok(id); + } + start_session(store, "kei-buddy", title, "telegram") +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn log_user_creates_session_and_message() { + let log = ChatLog::from_memory().unwrap(); + log.log_user(42, "hi there").await.unwrap(); + let results = log.search("hi there", Some(42), 10).await.unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].content, "hi there"); + assert_eq!(results[0].role, "user"); + } + + #[tokio::test] + async fn log_bot_uses_same_session_as_log_user() { + let log = ChatLog::from_memory().unwrap(); + log.log_user(42, "hello").await.unwrap(); + log.log_bot(42, "world").await.unwrap(); + let results = log.search("world", Some(42), 10).await.unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].role, "assistant"); + let user_results = log.search("hello", Some(42), 10).await.unwrap(); + assert_eq!(results[0].session_id, user_results[0].session_id); + } + + #[tokio::test] + async fn different_chats_get_different_sessions() { + let log = ChatLog::from_memory().unwrap(); + log.log_user(1, "alpha").await.unwrap(); + log.log_user(2, "beta").await.unwrap(); + let sid1 = log.ensure_session(1).await.unwrap(); + let sid2 = log.ensure_session(2).await.unwrap(); + assert_ne!(sid1, sid2); + } +} diff --git a/_primitives/_rust/kei-buddy/src/lib.rs b/_primitives/_rust/kei-buddy/src/lib.rs index 9f7be8b..54db0e2 100644 --- a/_primitives/_rust/kei-buddy/src/lib.rs +++ b/_primitives/_rust/kei-buddy/src/lib.rs @@ -10,6 +10,7 @@ //! * `schema` — buddy-specific SQLite DDL //! * `store` — `BuddyStore` trait + `SqliteBuddyStore` impl +pub mod chat_log; pub mod error; pub mod extractor; pub mod machine; @@ -26,6 +27,7 @@ pub mod serve; #[cfg(feature = "serve")] pub mod serve_telegram; +pub use chat_log::ChatLog; pub use error::BuddyError; pub use extractor::LlmExtractor; pub use machine::handle_step; diff --git a/_primitives/_rust/kei-buddy/src/serve.rs b/_primitives/_rust/kei-buddy/src/serve.rs index b32a9bd..74acaee 100644 --- a/_primitives/_rust/kei-buddy/src/serve.rs +++ b/_primitives/_rust/kei-buddy/src/serve.rs @@ -14,6 +14,7 @@ use tracing::{error, warn}; use kei_telegram_webhook::{WebhookContext, WebhookEvent}; use crate::{ + chat_log::ChatLog, error::BuddyError, extractor::LlmExtractor, machine::handle_step, @@ -29,20 +30,17 @@ pub struct ServeConfig { pub db_path: String, pub bot_token: String, pub webhook_secret: String, - /// If `Some`, only these chat_ids are processed; others are warn-logged + ignored. - /// `None` (or empty) means accept all chat_ids. + /// Whitelist; `None` or empty = accept all chat_ids. pub allowed_chat_ids: Option>, - /// Optional OpenAI-compatible LLM proxy. If set together with `llm_api_key`, - /// `run_serve` instantiates `OpenAiExtractor`; otherwise falls back to - /// `MockExtractor` with a warning. + /// LLM proxy URL + key; if both set, OpenAiExtractor is used, else MockExtractor. pub llm_proxy_url: Option, pub llm_api_key: Option, pub llm_model: Option, + /// Path to the SQLite file used by `ChatLog`. Default: `./kei-buddy-chat.db`. + pub chat_log_db_path: String, } -/// Axum state — implements `WebhookContext` for the webhook handler. -/// -/// `Arc` provides cheap `Clone` without requiring `E: Clone`. +/// Axum state — implements `WebhookContext`. `Arc` allows cheap `Clone`. pub struct BuddyContext { pub secret: String, pub bot_token: String, @@ -51,6 +49,8 @@ pub struct BuddyContext { pub http: reqwest::Client, /// Whitelist of chat_ids; `None` or empty = accept all. pub allowed_chat_ids: Arc>>, + /// Persistent log of all Telegram messages (user + bot). + pub chat_log: Arc, } impl Clone for BuddyContext { @@ -62,6 +62,7 @@ impl Clone for BuddyContext { extractor: Arc::clone(&self.extractor), http: self.http.clone(), allowed_chat_ids: Arc::clone(&self.allowed_chat_ids), + chat_log: Arc::clone(&self.chat_log), } } } @@ -103,6 +104,9 @@ impl BuddyContext { } async fn process_text(&self, chat_id: i64, text: &str) -> Result<(), BuddyError> { + if let Err(e) = self.chat_log.log_user(chat_id, text).await { + error!(chat_id, error = %e, "chat_log failure"); + } let state = self .store .load_state(chat_id) @@ -117,6 +121,9 @@ impl BuddyContext { self.store.save_state(chat_id, &output.next_state).await?; self.apply_persona_patch(chat_id, output.persona_patch).await?; send_message(&self.bot_token, chat_id, &output.response_text, &self.http).await?; + if let Err(e) = self.chat_log.log_bot(chat_id, &output.response_text).await { + error!(chat_id, error = %e, "chat_log failure"); + } Ok(()) } @@ -163,6 +170,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> { let store = Arc::new(SqliteBuddyStore::from_path(&cfg.db_path)?); let allowed_chat_ids = Arc::new(cfg.allowed_chat_ids); let http = reqwest::Client::new(); + let chat_log = Arc::new(ChatLog::from_path(&cfg.chat_log_db_path)?); #[cfg(feature = "extractor-openai")] { @@ -181,6 +189,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> { extractor, http, allowed_chat_ids, + chat_log, }).await; } } @@ -194,6 +203,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> { extractor, http, allowed_chat_ids, + chat_log, }).await }