feat(kei-buddy): wire kei-chat-store — log every user/bot message with FTS5
After-Ready conversation was going to /dev/null. With this change every
inbound Telegram text + every bot response is persisted to a SQLite +
FTS5 archive via the existing kei-chat-store primitive (no new crate).
Each Telegram chat_id maps 1:1 to a kei-chat-store session
(project="kei-buddy", title="tg-<chat_id>", model="telegram"). Cache
prevents per-message session lookups.
New file:
* src/chat_log.rs (198 LOC) — ChatLog adapter wrapping
kei_chat_store::Store + a chat_id→session_id Mutex cache.
API: from_path / from_memory / ensure_session / log_user /
log_bot / search(query, chat_id?, limit). Errors map to
BuddyError::Memory and never propagate from on_event — chat-log
failure is logged but does not block the conversation.
Modified:
* Cargo.toml — kei-chat-store path dep added.
* src/lib.rs — pub mod chat_log + re-export ChatLog.
* src/serve.rs — BuddyContext gains Arc<ChatLog>;
process_text calls log_user before handle_step + log_bot after
send_message; ServeConfig gains chat_log_db_path.
* src/bin/kei-buddy.rs — KEI_BUDDY_CHAT_LOG_PATH env
(default ./kei-buddy-chat.db); migrate subcommand applies the
chat-store schema alongside buddy_state schema.
Tests (3 new in src/chat_log.rs, all pass):
* log_user_creates_session_and_message
* log_bot_uses_same_session_as_log_user
* different_chats_get_different_sessions
Verify-before-commit:
* cargo check -p kei-buddy (default): PASS
* cargo check -p kei-buddy --features extractor-openai: PASS
* cargo test -p kei-buddy --lib: 23 passed / 0 failed
(was 20 before this commit; 3 new ChatLog tests)
NOT deployed — user is in active conversation with the live bot.
Will roll forward when user signals readiness.
This commit is contained in:
parent
44502507a2
commit
c1247fef00
5 changed files with 228 additions and 9 deletions
|
|
@ -27,6 +27,8 @@ rusqlite = { workspace = true }
|
|||
reqwest = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
kei-memory-sqlite = { path = "../kei-memory-sqlite" }
|
||||
kei-chat-store = { path = "../kei-chat-store" }
|
||||
chrono = { workspace = true }
|
||||
|
||||
# serve feature deps
|
||||
axum = { version = "0.7", features = ["json", "http1", "tokio"], optional = true }
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ async fn cmd_serve() -> anyhow::Result<()> {
|
|||
.ok()
|
||||
.or_else(|| std::env::var("OPENAI_API_KEY").ok()),
|
||||
llm_model: std::env::var("KEI_BUDDY_LLM_MODEL").ok(),
|
||||
chat_log_db_path: chat_log_path_from_env(),
|
||||
};
|
||||
run_serve(cfg).await
|
||||
}
|
||||
|
|
@ -84,8 +85,10 @@ async fn cmd_serve() -> anyhow::Result<()> {
|
|||
fn cmd_migrate() -> anyhow::Result<()> {
|
||||
let path = db_path_from_env();
|
||||
let _store = kei_buddy::store::SqliteBuddyStore::from_path(&path)?;
|
||||
let chat_log_path = chat_log_path_from_env();
|
||||
let _ = kei_buddy::ChatLog::from_path(&chat_log_path)?;
|
||||
init_log();
|
||||
tracing::info!(path = %path, "schema applied");
|
||||
tracing::info!(path = %path, chat_log_path = %chat_log_path, "schema applied");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -142,3 +145,7 @@ fn port_from_env() -> u16 {
|
|||
fn db_path_from_env() -> String {
|
||||
std::env::var("KEI_BUDDY_DB_PATH").unwrap_or_else(|_| "./kei-buddy.db".into())
|
||||
}
|
||||
|
||||
fn chat_log_path_from_env() -> String {
|
||||
std::env::var("KEI_BUDDY_CHAT_LOG_PATH").unwrap_or_else(|_| "./kei-buddy-chat.db".into())
|
||||
}
|
||||
|
|
|
|||
198
_primitives/_rust/kei-buddy/src/chat_log.rs
Normal file
198
_primitives/_rust/kei-buddy/src/chat_log.rs
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
// SPDX-License-Identifier: Apache-2.0
|
||||
//! `ChatLog` — maps Telegram chat_id to kei-chat-store sessions.
|
||||
//!
|
||||
//! Constructor Pattern: one responsibility — bridge between Telegram chat_id
|
||||
//! (i64) and kei-chat-store session (String UUID). All rusqlite calls are
|
||||
//! dispatched via `tokio::task::spawn_blocking`.
|
||||
//!
|
||||
//! `rusqlite::Connection` is not `Sync`, so `Store` is not `Sync`. We wrap
|
||||
//! `Store` in `Mutex<Store>` to obtain `Send + Sync` on `Arc<Mutex<Store>>`.
|
||||
//! Each blocking task locks the mutex for the duration of the DB call.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use chrono::Utc;
|
||||
use kei_chat_store::{
|
||||
search::search as cs_search,
|
||||
sessions::{save_message, start_session, ChatMessage},
|
||||
Store,
|
||||
};
|
||||
use rusqlite::OptionalExtension;
|
||||
|
||||
use crate::error::BuddyError;
|
||||
|
||||
/// Thin wrapper over `kei-chat-store` keyed by Telegram chat_id.
|
||||
pub struct ChatLog {
|
||||
store: Arc<Mutex<Store>>,
|
||||
/// chat_id → session_id; populated lazily.
|
||||
sessions: Mutex<HashMap<i64, String>>,
|
||||
}
|
||||
|
||||
impl ChatLog {
|
||||
/// Open (or create) a file-backed store at `path`.
|
||||
pub fn from_path(path: impl AsRef<Path>) -> Result<Self, BuddyError> {
|
||||
let store = Store::open(path.as_ref()).map_err(|e| BuddyError::Memory(e.to_string()))?;
|
||||
Ok(Self {
|
||||
store: Arc::new(Mutex::new(store)),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Open an in-memory store (for tests).
|
||||
pub fn from_memory() -> Result<Self, BuddyError> {
|
||||
let store = Store::open_memory().map_err(|e| BuddyError::Memory(e.to_string()))?;
|
||||
Ok(Self {
|
||||
store: Arc::new(Mutex::new(store)),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the session_id for `chat_id`, creating one if absent.
|
||||
pub async fn ensure_session(&self, chat_id: i64) -> Result<String, BuddyError> {
|
||||
// Fast path: session already cached.
|
||||
{
|
||||
let guard = self.sessions.lock().expect("sessions mutex poisoned");
|
||||
if let Some(id) = guard.get(&chat_id) {
|
||||
return Ok(id.clone());
|
||||
}
|
||||
}
|
||||
// Slow path: query or create in blocking thread.
|
||||
let store = Arc::clone(&self.store);
|
||||
let title = format!("tg-{chat_id}");
|
||||
let session_id = tokio::task::spawn_blocking(move || {
|
||||
let locked = store.lock().expect("store mutex poisoned");
|
||||
find_or_create_session(&locked, &title)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
|
||||
.map_err(|e| BuddyError::Memory(e.to_string()))?;
|
||||
|
||||
let mut guard = self.sessions.lock().expect("sessions mutex poisoned");
|
||||
guard.insert(chat_id, session_id.clone());
|
||||
Ok(session_id)
|
||||
}
|
||||
|
||||
/// Persist a user-side message.
|
||||
pub async fn log_user(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> {
|
||||
self.log_role(chat_id, "user", content).await
|
||||
}
|
||||
|
||||
/// Persist a bot-side response.
|
||||
pub async fn log_bot(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> {
|
||||
self.log_role(chat_id, "assistant", content).await
|
||||
}
|
||||
|
||||
/// Full-text search; optionally filter by chat_id.
|
||||
pub async fn search(
|
||||
&self,
|
||||
query: &str,
|
||||
chat_id: Option<i64>,
|
||||
limit: i64,
|
||||
) -> Result<Vec<ChatMessage>, BuddyError> {
|
||||
let filter_session = match chat_id {
|
||||
Some(cid) => Some(self.ensure_session(cid).await?),
|
||||
None => None,
|
||||
};
|
||||
let store = Arc::clone(&self.store);
|
||||
let q = query.to_string();
|
||||
let msgs = tokio::task::spawn_blocking(move || {
|
||||
let locked = store.lock().expect("store mutex poisoned");
|
||||
cs_search(&locked, &q, limit).map_err(|e| BuddyError::Memory(e.to_string()))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
|
||||
.map_err(|e: BuddyError| e)?;
|
||||
|
||||
match filter_session {
|
||||
None => Ok(msgs),
|
||||
Some(sid) => Ok(msgs.into_iter().filter(|m| m.session_id == sid).collect()),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Private helpers ─────────────────────────────────────────────────────
|
||||
|
||||
async fn log_role(
|
||||
&self,
|
||||
chat_id: i64,
|
||||
role: &str,
|
||||
content: &str,
|
||||
) -> Result<(), BuddyError> {
|
||||
let session_id = self.ensure_session(chat_id).await?;
|
||||
let msg = ChatMessage {
|
||||
id: 0,
|
||||
session_id,
|
||||
role: role.to_string(),
|
||||
content: content.to_string(),
|
||||
tokens_in: 0,
|
||||
tokens_out: 0,
|
||||
cost: 0.0,
|
||||
created_at: Utc::now().timestamp(),
|
||||
};
|
||||
let store = Arc::clone(&self.store);
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let locked = store.lock().expect("store mutex poisoned");
|
||||
save_message(&locked, &msg).map_err(|e| BuddyError::Memory(e.to_string()))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
/// Query the DB for an existing session; create if absent.
|
||||
fn find_or_create_session(store: &Store, title: &str) -> anyhow::Result<String> {
|
||||
let existing: Option<String> = store
|
||||
.conn()
|
||||
.query_row(
|
||||
"SELECT id FROM chat_sessions WHERE title = ?1 LIMIT 1",
|
||||
rusqlite::params![title],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.optional()?;
|
||||
|
||||
if let Some(id) = existing {
|
||||
return Ok(id);
|
||||
}
|
||||
start_session(store, "kei-buddy", title, "telegram")
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn log_user_creates_session_and_message() {
|
||||
let log = ChatLog::from_memory().unwrap();
|
||||
log.log_user(42, "hi there").await.unwrap();
|
||||
let results = log.search("hi there", Some(42), 10).await.unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].content, "hi there");
|
||||
assert_eq!(results[0].role, "user");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn log_bot_uses_same_session_as_log_user() {
|
||||
let log = ChatLog::from_memory().unwrap();
|
||||
log.log_user(42, "hello").await.unwrap();
|
||||
log.log_bot(42, "world").await.unwrap();
|
||||
let results = log.search("world", Some(42), 10).await.unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].role, "assistant");
|
||||
let user_results = log.search("hello", Some(42), 10).await.unwrap();
|
||||
assert_eq!(results[0].session_id, user_results[0].session_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn different_chats_get_different_sessions() {
|
||||
let log = ChatLog::from_memory().unwrap();
|
||||
log.log_user(1, "alpha").await.unwrap();
|
||||
log.log_user(2, "beta").await.unwrap();
|
||||
let sid1 = log.ensure_session(1).await.unwrap();
|
||||
let sid2 = log.ensure_session(2).await.unwrap();
|
||||
assert_ne!(sid1, sid2);
|
||||
}
|
||||
}
|
||||
|
|
@ -10,6 +10,7 @@
|
|||
//! * `schema` — buddy-specific SQLite DDL
|
||||
//! * `store` — `BuddyStore` trait + `SqliteBuddyStore` impl
|
||||
|
||||
pub mod chat_log;
|
||||
pub mod error;
|
||||
pub mod extractor;
|
||||
pub mod machine;
|
||||
|
|
@ -26,6 +27,7 @@ pub mod serve;
|
|||
#[cfg(feature = "serve")]
|
||||
pub mod serve_telegram;
|
||||
|
||||
pub use chat_log::ChatLog;
|
||||
pub use error::BuddyError;
|
||||
pub use extractor::LlmExtractor;
|
||||
pub use machine::handle_step;
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use tracing::{error, warn};
|
|||
use kei_telegram_webhook::{WebhookContext, WebhookEvent};
|
||||
|
||||
use crate::{
|
||||
chat_log::ChatLog,
|
||||
error::BuddyError,
|
||||
extractor::LlmExtractor,
|
||||
machine::handle_step,
|
||||
|
|
@ -29,20 +30,17 @@ pub struct ServeConfig {
|
|||
pub db_path: String,
|
||||
pub bot_token: String,
|
||||
pub webhook_secret: String,
|
||||
/// If `Some`, only these chat_ids are processed; others are warn-logged + ignored.
|
||||
/// `None` (or empty) means accept all chat_ids.
|
||||
/// Whitelist; `None` or empty = accept all chat_ids.
|
||||
pub allowed_chat_ids: Option<Vec<i64>>,
|
||||
/// Optional OpenAI-compatible LLM proxy. If set together with `llm_api_key`,
|
||||
/// `run_serve` instantiates `OpenAiExtractor`; otherwise falls back to
|
||||
/// `MockExtractor` with a warning.
|
||||
/// LLM proxy URL + key; if both set, OpenAiExtractor is used, else MockExtractor.
|
||||
pub llm_proxy_url: Option<String>,
|
||||
pub llm_api_key: Option<String>,
|
||||
pub llm_model: Option<String>,
|
||||
/// Path to the SQLite file used by `ChatLog`. Default: `./kei-buddy-chat.db`.
|
||||
pub chat_log_db_path: String,
|
||||
}
|
||||
|
||||
/// Axum state — implements `WebhookContext` for the webhook handler.
|
||||
///
|
||||
/// `Arc<E>` provides cheap `Clone` without requiring `E: Clone`.
|
||||
/// Axum state — implements `WebhookContext`. `Arc<E>` allows cheap `Clone`.
|
||||
pub struct BuddyContext<E: LlmExtractor + Send + Sync + 'static> {
|
||||
pub secret: String,
|
||||
pub bot_token: String,
|
||||
|
|
@ -51,6 +49,8 @@ pub struct BuddyContext<E: LlmExtractor + Send + Sync + 'static> {
|
|||
pub http: reqwest::Client,
|
||||
/// Whitelist of chat_ids; `None` or empty = accept all.
|
||||
pub allowed_chat_ids: Arc<Option<Vec<i64>>>,
|
||||
/// Persistent log of all Telegram messages (user + bot).
|
||||
pub chat_log: Arc<ChatLog>,
|
||||
}
|
||||
|
||||
impl<E: LlmExtractor + Send + Sync + 'static> Clone for BuddyContext<E> {
|
||||
|
|
@ -62,6 +62,7 @@ impl<E: LlmExtractor + Send + Sync + 'static> Clone for BuddyContext<E> {
|
|||
extractor: Arc::clone(&self.extractor),
|
||||
http: self.http.clone(),
|
||||
allowed_chat_ids: Arc::clone(&self.allowed_chat_ids),
|
||||
chat_log: Arc::clone(&self.chat_log),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -103,6 +104,9 @@ impl<E: LlmExtractor + Send + Sync + 'static> BuddyContext<E> {
|
|||
}
|
||||
|
||||
async fn process_text(&self, chat_id: i64, text: &str) -> Result<(), BuddyError> {
|
||||
if let Err(e) = self.chat_log.log_user(chat_id, text).await {
|
||||
error!(chat_id, error = %e, "chat_log failure");
|
||||
}
|
||||
let state = self
|
||||
.store
|
||||
.load_state(chat_id)
|
||||
|
|
@ -117,6 +121,9 @@ impl<E: LlmExtractor + Send + Sync + 'static> BuddyContext<E> {
|
|||
self.store.save_state(chat_id, &output.next_state).await?;
|
||||
self.apply_persona_patch(chat_id, output.persona_patch).await?;
|
||||
send_message(&self.bot_token, chat_id, &output.response_text, &self.http).await?;
|
||||
if let Err(e) = self.chat_log.log_bot(chat_id, &output.response_text).await {
|
||||
error!(chat_id, error = %e, "chat_log failure");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -163,6 +170,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
|
|||
let store = Arc::new(SqliteBuddyStore::from_path(&cfg.db_path)?);
|
||||
let allowed_chat_ids = Arc::new(cfg.allowed_chat_ids);
|
||||
let http = reqwest::Client::new();
|
||||
let chat_log = Arc::new(ChatLog::from_path(&cfg.chat_log_db_path)?);
|
||||
|
||||
#[cfg(feature = "extractor-openai")]
|
||||
{
|
||||
|
|
@ -181,6 +189,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
|
|||
extractor,
|
||||
http,
|
||||
allowed_chat_ids,
|
||||
chat_log,
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
|
|
@ -194,6 +203,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
|
|||
extractor,
|
||||
http,
|
||||
allowed_chat_ids,
|
||||
chat_log,
|
||||
}).await
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue