feat(kei-buddy): wire kei-chat-store — log every user/bot message with FTS5

After-Ready conversation was going to /dev/null. With this change every
inbound Telegram text + every bot response is persisted to a SQLite +
FTS5 archive via the existing kei-chat-store primitive (no new crate).

Each Telegram chat_id maps 1:1 to a kei-chat-store session
(project="kei-buddy", title="tg-<chat_id>", model="telegram"). Cache
prevents per-message session lookups.

New file:
  * src/chat_log.rs (198 LOC) — ChatLog adapter wrapping
    kei_chat_store::Store + a chat_id→session_id Mutex cache.
    API: from_path / from_memory / ensure_session / log_user /
    log_bot / search(query, chat_id?, limit). Errors map to
    BuddyError::Memory and never propagate from on_event — chat-log
    failure is logged but does not block the conversation.

Modified:
  * Cargo.toml — kei-chat-store path dep added.
  * src/lib.rs — pub mod chat_log + re-export ChatLog.
  * src/serve.rs — BuddyContext gains Arc<ChatLog>;
    process_text calls log_user before handle_step + log_bot after
    send_message; ServeConfig gains chat_log_db_path.
  * src/bin/kei-buddy.rs — KEI_BUDDY_CHAT_LOG_PATH env
    (default ./kei-buddy-chat.db); migrate subcommand applies the
    chat-store schema alongside buddy_state schema.

Tests (3 new in src/chat_log.rs, all pass):
  * log_user_creates_session_and_message
  * log_bot_uses_same_session_as_log_user
  * different_chats_get_different_sessions

Verify-before-commit:
  * cargo check -p kei-buddy (default): PASS
  * cargo check -p kei-buddy --features extractor-openai: PASS
  * cargo test -p kei-buddy --lib: 23 passed / 0 failed
    (was 20 before this commit; 3 new ChatLog tests)

NOT deployed — user is in active conversation with the live bot.
Will roll forward when user signals readiness.
This commit is contained in:
Parfii-bot 2026-05-12 15:51:24 +08:00
parent 0045b6ac77
commit fb7c1bf859
5 changed files with 228 additions and 9 deletions

View file

@ -27,6 +27,8 @@ rusqlite = { workspace = true }
reqwest = { workspace = true }
anyhow = { workspace = true }
kei-memory-sqlite = { path = "../kei-memory-sqlite" }
kei-chat-store = { path = "../kei-chat-store" }
chrono = { workspace = true }
# serve feature deps
axum = { version = "0.7", features = ["json", "http1", "tokio"], optional = true }

View file

@ -56,6 +56,7 @@ async fn cmd_serve() -> anyhow::Result<()> {
.ok()
.or_else(|| std::env::var("OPENAI_API_KEY").ok()),
llm_model: std::env::var("KEI_BUDDY_LLM_MODEL").ok(),
chat_log_db_path: chat_log_path_from_env(),
};
run_serve(cfg).await
}
@ -84,8 +85,10 @@ async fn cmd_serve() -> anyhow::Result<()> {
fn cmd_migrate() -> anyhow::Result<()> {
let path = db_path_from_env();
let _store = kei_buddy::store::SqliteBuddyStore::from_path(&path)?;
let chat_log_path = chat_log_path_from_env();
let _ = kei_buddy::ChatLog::from_path(&chat_log_path)?;
init_log();
tracing::info!(path = %path, "schema applied");
tracing::info!(path = %path, chat_log_path = %chat_log_path, "schema applied");
Ok(())
}
@ -142,3 +145,7 @@ fn port_from_env() -> u16 {
fn db_path_from_env() -> String {
std::env::var("KEI_BUDDY_DB_PATH").unwrap_or_else(|_| "./kei-buddy.db".into())
}
fn chat_log_path_from_env() -> String {
std::env::var("KEI_BUDDY_CHAT_LOG_PATH").unwrap_or_else(|_| "./kei-buddy-chat.db".into())
}

View file

@ -0,0 +1,198 @@
// SPDX-License-Identifier: Apache-2.0
//! `ChatLog` — maps Telegram chat_id to kei-chat-store sessions.
//!
//! Constructor Pattern: one responsibility — bridge between Telegram chat_id
//! (i64) and kei-chat-store session (String UUID). All rusqlite calls are
//! dispatched via `tokio::task::spawn_blocking`.
//!
//! `rusqlite::Connection` is not `Sync`, so `Store` is not `Sync`. We wrap
//! `Store` in `Mutex<Store>` to obtain `Send + Sync` on `Arc<Mutex<Store>>`.
//! Each blocking task locks the mutex for the duration of the DB call.
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use chrono::Utc;
use kei_chat_store::{
search::search as cs_search,
sessions::{save_message, start_session, ChatMessage},
Store,
};
use rusqlite::OptionalExtension;
use crate::error::BuddyError;
/// Thin wrapper over `kei-chat-store` keyed by Telegram chat_id.
pub struct ChatLog {
store: Arc<Mutex<Store>>,
/// chat_id → session_id; populated lazily.
sessions: Mutex<HashMap<i64, String>>,
}
impl ChatLog {
/// Open (or create) a file-backed store at `path`.
pub fn from_path(path: impl AsRef<Path>) -> Result<Self, BuddyError> {
let store = Store::open(path.as_ref()).map_err(|e| BuddyError::Memory(e.to_string()))?;
Ok(Self {
store: Arc::new(Mutex::new(store)),
sessions: Mutex::new(HashMap::new()),
})
}
/// Open an in-memory store (for tests).
pub fn from_memory() -> Result<Self, BuddyError> {
let store = Store::open_memory().map_err(|e| BuddyError::Memory(e.to_string()))?;
Ok(Self {
store: Arc::new(Mutex::new(store)),
sessions: Mutex::new(HashMap::new()),
})
}
/// Return the session_id for `chat_id`, creating one if absent.
pub async fn ensure_session(&self, chat_id: i64) -> Result<String, BuddyError> {
// Fast path: session already cached.
{
let guard = self.sessions.lock().expect("sessions mutex poisoned");
if let Some(id) = guard.get(&chat_id) {
return Ok(id.clone());
}
}
// Slow path: query or create in blocking thread.
let store = Arc::clone(&self.store);
let title = format!("tg-{chat_id}");
let session_id = tokio::task::spawn_blocking(move || {
let locked = store.lock().expect("store mutex poisoned");
find_or_create_session(&locked, &title)
})
.await
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
.map_err(|e| BuddyError::Memory(e.to_string()))?;
let mut guard = self.sessions.lock().expect("sessions mutex poisoned");
guard.insert(chat_id, session_id.clone());
Ok(session_id)
}
/// Persist a user-side message.
pub async fn log_user(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> {
self.log_role(chat_id, "user", content).await
}
/// Persist a bot-side response.
pub async fn log_bot(&self, chat_id: i64, content: &str) -> Result<(), BuddyError> {
self.log_role(chat_id, "assistant", content).await
}
/// Full-text search; optionally filter by chat_id.
pub async fn search(
&self,
query: &str,
chat_id: Option<i64>,
limit: i64,
) -> Result<Vec<ChatMessage>, BuddyError> {
let filter_session = match chat_id {
Some(cid) => Some(self.ensure_session(cid).await?),
None => None,
};
let store = Arc::clone(&self.store);
let q = query.to_string();
let msgs = tokio::task::spawn_blocking(move || {
let locked = store.lock().expect("store mutex poisoned");
cs_search(&locked, &q, limit).map_err(|e| BuddyError::Memory(e.to_string()))
})
.await
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
.map_err(|e: BuddyError| e)?;
match filter_session {
None => Ok(msgs),
Some(sid) => Ok(msgs.into_iter().filter(|m| m.session_id == sid).collect()),
}
}
// ── Private helpers ─────────────────────────────────────────────────────
async fn log_role(
&self,
chat_id: i64,
role: &str,
content: &str,
) -> Result<(), BuddyError> {
let session_id = self.ensure_session(chat_id).await?;
let msg = ChatMessage {
id: 0,
session_id,
role: role.to_string(),
content: content.to_string(),
tokens_in: 0,
tokens_out: 0,
cost: 0.0,
created_at: Utc::now().timestamp(),
};
let store = Arc::clone(&self.store);
tokio::task::spawn_blocking(move || {
let locked = store.lock().expect("store mutex poisoned");
save_message(&locked, &msg).map_err(|e| BuddyError::Memory(e.to_string()))
})
.await
.map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))?
.map(|_| ())
}
}
/// Query the DB for an existing session; create if absent.
fn find_or_create_session(store: &Store, title: &str) -> anyhow::Result<String> {
let existing: Option<String> = store
.conn()
.query_row(
"SELECT id FROM chat_sessions WHERE title = ?1 LIMIT 1",
rusqlite::params![title],
|row| row.get(0),
)
.optional()?;
if let Some(id) = existing {
return Ok(id);
}
start_session(store, "kei-buddy", title, "telegram")
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn log_user_creates_session_and_message() {
let log = ChatLog::from_memory().unwrap();
log.log_user(42, "hi there").await.unwrap();
let results = log.search("hi there", Some(42), 10).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].content, "hi there");
assert_eq!(results[0].role, "user");
}
#[tokio::test]
async fn log_bot_uses_same_session_as_log_user() {
let log = ChatLog::from_memory().unwrap();
log.log_user(42, "hello").await.unwrap();
log.log_bot(42, "world").await.unwrap();
let results = log.search("world", Some(42), 10).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].role, "assistant");
let user_results = log.search("hello", Some(42), 10).await.unwrap();
assert_eq!(results[0].session_id, user_results[0].session_id);
}
#[tokio::test]
async fn different_chats_get_different_sessions() {
let log = ChatLog::from_memory().unwrap();
log.log_user(1, "alpha").await.unwrap();
log.log_user(2, "beta").await.unwrap();
let sid1 = log.ensure_session(1).await.unwrap();
let sid2 = log.ensure_session(2).await.unwrap();
assert_ne!(sid1, sid2);
}
}

View file

@ -10,6 +10,7 @@
//! * `schema` — buddy-specific SQLite DDL
//! * `store` — `BuddyStore` trait + `SqliteBuddyStore` impl
pub mod chat_log;
pub mod error;
pub mod extractor;
pub mod machine;
@ -26,6 +27,7 @@ pub mod serve;
#[cfg(feature = "serve")]
pub mod serve_telegram;
pub use chat_log::ChatLog;
pub use error::BuddyError;
pub use extractor::LlmExtractor;
pub use machine::handle_step;

View file

@ -14,6 +14,7 @@ use tracing::{error, warn};
use kei_telegram_webhook::{WebhookContext, WebhookEvent};
use crate::{
chat_log::ChatLog,
error::BuddyError,
extractor::LlmExtractor,
machine::handle_step,
@ -29,20 +30,17 @@ pub struct ServeConfig {
pub db_path: String,
pub bot_token: String,
pub webhook_secret: String,
/// If `Some`, only these chat_ids are processed; others are warn-logged + ignored.
/// `None` (or empty) means accept all chat_ids.
/// Whitelist; `None` or empty = accept all chat_ids.
pub allowed_chat_ids: Option<Vec<i64>>,
/// Optional OpenAI-compatible LLM proxy. If set together with `llm_api_key`,
/// `run_serve` instantiates `OpenAiExtractor`; otherwise falls back to
/// `MockExtractor` with a warning.
/// LLM proxy URL + key; if both set, OpenAiExtractor is used, else MockExtractor.
pub llm_proxy_url: Option<String>,
pub llm_api_key: Option<String>,
pub llm_model: Option<String>,
/// Path to the SQLite file used by `ChatLog`. Default: `./kei-buddy-chat.db`.
pub chat_log_db_path: String,
}
/// Axum state — implements `WebhookContext` for the webhook handler.
///
/// `Arc<E>` provides cheap `Clone` without requiring `E: Clone`.
/// Axum state — implements `WebhookContext`. `Arc<E>` allows cheap `Clone`.
pub struct BuddyContext<E: LlmExtractor + Send + Sync + 'static> {
pub secret: String,
pub bot_token: String,
@ -51,6 +49,8 @@ pub struct BuddyContext<E: LlmExtractor + Send + Sync + 'static> {
pub http: reqwest::Client,
/// Whitelist of chat_ids; `None` or empty = accept all.
pub allowed_chat_ids: Arc<Option<Vec<i64>>>,
/// Persistent log of all Telegram messages (user + bot).
pub chat_log: Arc<ChatLog>,
}
impl<E: LlmExtractor + Send + Sync + 'static> Clone for BuddyContext<E> {
@ -62,6 +62,7 @@ impl<E: LlmExtractor + Send + Sync + 'static> Clone for BuddyContext<E> {
extractor: Arc::clone(&self.extractor),
http: self.http.clone(),
allowed_chat_ids: Arc::clone(&self.allowed_chat_ids),
chat_log: Arc::clone(&self.chat_log),
}
}
}
@ -103,6 +104,9 @@ impl<E: LlmExtractor + Send + Sync + 'static> BuddyContext<E> {
}
async fn process_text(&self, chat_id: i64, text: &str) -> Result<(), BuddyError> {
if let Err(e) = self.chat_log.log_user(chat_id, text).await {
error!(chat_id, error = %e, "chat_log failure");
}
let state = self
.store
.load_state(chat_id)
@ -117,6 +121,9 @@ impl<E: LlmExtractor + Send + Sync + 'static> BuddyContext<E> {
self.store.save_state(chat_id, &output.next_state).await?;
self.apply_persona_patch(chat_id, output.persona_patch).await?;
send_message(&self.bot_token, chat_id, &output.response_text, &self.http).await?;
if let Err(e) = self.chat_log.log_bot(chat_id, &output.response_text).await {
error!(chat_id, error = %e, "chat_log failure");
}
Ok(())
}
@ -163,6 +170,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
let store = Arc::new(SqliteBuddyStore::from_path(&cfg.db_path)?);
let allowed_chat_ids = Arc::new(cfg.allowed_chat_ids);
let http = reqwest::Client::new();
let chat_log = Arc::new(ChatLog::from_path(&cfg.chat_log_db_path)?);
#[cfg(feature = "extractor-openai")]
{
@ -181,6 +189,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
extractor,
http,
allowed_chat_ids,
chat_log,
}).await;
}
}
@ -194,6 +203,7 @@ pub async fn run_serve(cfg: ServeConfig) -> anyhow::Result<()> {
extractor,
http,
allowed_chat_ids,
chat_log,
}).await
}