diff --git a/_primitives/_rust/kei-buddy/src/conversational.rs b/_primitives/_rust/kei-buddy/src/conversational.rs new file mode 100644 index 0000000..17fc600 --- /dev/null +++ b/_primitives/_rust/kei-buddy/src/conversational.rs @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: Apache-2.0 +//! LLM-driven conversational onboarding step (replaces rigid FSM after Intro/AskLanguage). +//! Constructor Pattern: one responsibility — one LLM call, one structured response. + +use serde_json::{json, Value}; + +use crate::{ + error::BuddyError, + extractor::LlmExtractor, + retrieval::RetrievalContext, + state::OnboardState, + strings::Lang, + transition::StepOutput, +}; + +const PROMPT_TEMPLATE: &str = concat!( + "You are KeiBuddy, a personal-assistant chatbot doing first-meeting onboarding.\n", + "Be a warm, natural conversationalist. Reply in the user's language ({lang}).\n", + "Don't act like a survey form — extract info from natural conversation.\n\n", + "USER PROFILE (so far — fields with values are KNOWN, don't re-ask):\n{persona}\n\n", + "RECENT CONVERSATION (latest first; \"u:\" user, \"b:\" you):\n{history}\n\n", + "RELATED KNOWLEDGE (relevant atoms from user's graph):\n{atoms}\n\n", + "USER JUST SAID:\n\"{text}\"\n\n", + "YOUR JOB: Update persona fields you newly learned. Decide what to ask next.\n\n", + "Output ONLY this JSON object, no prose, no markdown fences:\n", + "{{\"slot_updates\":{{\"name\":\"\",\"tone\":\"\",\"interests\":[],", + "\"hobbies\":[],\"schedule\":{{}},\"language\":\"\"}},", + "\"response_text\":\"\",", + "\"done\":false,\"focus\":\"name|tone|interests|hobbies|topics|schedule|free|done\"}}" +); + +/// Drive one turn of free-form onboarding with a single LLM call. +pub async fn conversational_step( + state: &OnboardState, + persona: &Value, + context: &RetrievalContext, + user_text: &str, + extractor: &E, + lang: Lang, +) -> Result { + let prompt = build_prompt(persona, context, user_text, lang); + let raw = extractor.extract(&prompt, "").await?; + match parse_llm_response(&raw, state) { + Some(out) => Ok(out), + None => Ok(fallback_output(state, lang)), + } +} + +fn build_prompt(persona: &Value, ctx: &RetrievalContext, text: &str, lang: Lang) -> String { + let lc = lang.code(); + let ps = serde_json::to_string_pretty(persona).unwrap_or_default(); + let hs = if ctx.history.is_empty() { "(no prior messages)".into() } else { ctx.history.join("\n") }; + let atoms = if ctx.atoms.is_empty() { "(none)".into() } else { ctx.atoms.join("\n") }; + PROMPT_TEMPLATE + .replace("{lang}", lc) + .replace("{persona}", &ps) + .replace("{history}", &hs) + .replace("{atoms}", &atoms) + .replace("{text}", text) +} + +fn parse_llm_response(raw: &Value, current: &OnboardState) -> Option { + let obj = raw.as_object()?; + let response_text = obj.get("response_text")?.as_str()?.to_owned(); + if response_text.is_empty() { return None; } + let done = obj.get("done").and_then(|v| v.as_bool()).unwrap_or(false); + let focus = obj.get("focus").and_then(|v| v.as_str()).unwrap_or("free"); + let slot_updates = obj.get("slot_updates").cloned().unwrap_or_else(|| json!({})); + let next_state = if done { OnboardState::Ready } else { focus_to_state(focus, current) }; + Some(StepOutput { next_state, response_text, persona_patch: slot_updates }) +} + +fn focus_to_state(focus: &str, current: &OnboardState) -> OnboardState { + match focus { + "name" => OnboardState::AskName, + "tone" => OnboardState::AskTone, + "interests"=> OnboardState::AskInterests, + "hobbies" => OnboardState::AskHobbies, + "topics" => OnboardState::TopicSpecifics, + "schedule" => OnboardState::AskSchedule, + "done" => OnboardState::Ready, + _ => current.clone(), + } +} + +fn fallback_output(state: &OnboardState, lang: Lang) -> StepOutput { + let text = match lang { + Lang::Ru => "Хм, дай-ка подумаю — можешь перефразировать?", + Lang::En => "Hmm, let me think — could you rephrase?", + }; + StepOutput { next_state: state.clone(), response_text: text.to_owned(), persona_patch: json!({}) } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── +#[cfg(test)] +mod tests { + use super::*; + use crate::extractor::MockExtractor; + use serde_json::json; + + fn empty_ctx() -> RetrievalContext { + RetrievalContext { history: vec![], atoms: vec![] } + } + + fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Runtime::new().unwrap() + } + + #[test] + fn parses_well_formed_llm_response() { + rt().block_on(async { + let mock = MockExtractor::new(json!({ + "slot_updates": { "name": "Alice" }, + "response_text": "Nice to meet you! What are your interests?", + "done": false, + "focus": "interests" + })); + let out = conversational_step( + &OnboardState::AskName, &json!({}), &empty_ctx(), + "I'm Alice", &mock, Lang::En, + ).await.unwrap(); + assert_eq!(out.next_state, OnboardState::AskInterests); + assert!(!out.response_text.is_empty()); + assert_eq!(out.persona_patch["name"].as_str(), Some("Alice")); + }); + } + + #[test] + fn done_true_transitions_to_ready() { + rt().block_on(async { + let mock = MockExtractor::new(json!({ + "slot_updates": {}, + "response_text": "All set!", + "done": true, + "focus": "done" + })); + let out = conversational_step( + &OnboardState::AskSchedule, &json!({}), &empty_ctx(), + "mornings at 9", &mock, Lang::En, + ).await.unwrap(); + assert_eq!(out.next_state, OnboardState::Ready); + }); + } + + #[test] + fn invalid_json_falls_back_gracefully() { + rt().block_on(async { + let mock = MockExtractor::new(json!("not a json object at all")); + let out = conversational_step( + &OnboardState::AskInterests, &json!({}), &empty_ctx(), + "hello", &mock, Lang::En, + ).await.unwrap(); + assert_eq!(out.next_state, OnboardState::AskInterests); + assert!(!out.response_text.is_empty()); + }); + } +} diff --git a/_primitives/_rust/kei-buddy/src/lib.rs b/_primitives/_rust/kei-buddy/src/lib.rs index 73de424..e583aab 100644 --- a/_primitives/_rust/kei-buddy/src/lib.rs +++ b/_primitives/_rust/kei-buddy/src/lib.rs @@ -15,12 +15,14 @@ pub(crate) mod command_exec; pub mod commands; pub mod contacts; pub mod contacts_sync; +pub mod conversational; pub mod error; pub mod extractor; pub mod machine; pub(crate) mod machine_helpers; pub(crate) mod machine_lang; pub mod persona_merge; +pub mod retrieval; pub mod schema; pub mod state; pub mod store; diff --git a/_primitives/_rust/kei-buddy/src/retrieval.rs b/_primitives/_rust/kei-buddy/src/retrieval.rs new file mode 100644 index 0000000..2ce7cde --- /dev/null +++ b/_primitives/_rust/kei-buddy/src/retrieval.rs @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: Apache-2.0 +//! Retrieval context builder — history from ChatLog + atoms from Topics. +//! Constructor Pattern: one responsibility — gather context for conversational_step. + +use std::sync::Arc; + +use tracing::warn; + +use crate::{chat_log::ChatLog, topics::Topics}; + +/// Pre-assembled context for `conversational_step`. +pub struct RetrievalContext { + /// Last N messages formatted as "u: ..." / "b: ..." (latest first). + pub history: Vec, + /// Top-K kei-sage Units formatted as "[topic] title: content[..200]". + pub atoms: Vec, +} + +/// Gather conversation history and relevant atoms for the current turn. +/// +/// All errors are swallowed and logged at `warn` — the caller must always +/// receive a usable (possibly empty) context. +pub async fn retrieve_context( + chat_log: &Arc, + topics: &Arc, + chat_id: i64, + query: &str, + history_n: usize, + atoms_k: i64, +) -> RetrievalContext { + let history = fetch_history(chat_log, chat_id, query, history_n).await; + let atoms = fetch_atoms(topics, query, atoms_k).await; + RetrievalContext { history, atoms } +} + +async fn fetch_history( + chat_log: &Arc, + chat_id: i64, + query: &str, + n: usize, +) -> Vec { + let limit = n.max(1) as i64; + match chat_log.search(query, Some(chat_id), limit).await { + Ok(msgs) => msgs + .iter() + .map(|m| { + let prefix = if m.role == "user" { "u" } else { "b" }; + format!("{prefix}: {}", m.content) + }) + .collect(), + Err(e) => { + warn!(chat_id, error = %e, "retrieve_context: history fetch failed"); + vec![] + } + } +} + +async fn fetch_atoms(topics: &Arc, query: &str, k: i64) -> Vec { + match topics.search(query, k).await { + Ok(units) => units + .iter() + .map(|u| { + let snippet: String = u.content.chars().take(200).collect(); + format!("[{}] {}: {}", u.unit_type, u.title, snippet) + }) + .collect(), + Err(e) => { + warn!(error = %e, "retrieve_context: atom search failed"); + vec![] + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn retrieve_returns_empty_on_empty_stores() { + let log = Arc::new(ChatLog::from_memory().unwrap()); + let topics = Arc::new(Topics::from_memory().unwrap()); + let ctx = retrieve_context(&log, &topics, 42, "anything", 10, 5).await; + assert!(ctx.history.is_empty(), "expected empty history"); + assert!(ctx.atoms.is_empty(), "expected empty atoms"); + } + + #[tokio::test] + async fn retrieve_finds_seeded_data() { + let log = Arc::new(ChatLog::from_memory().unwrap()); + let topics = Arc::new(Topics::from_memory().unwrap()); + // Seed chat log + log.log_user(99, "rust programming").await.unwrap(); + log.log_bot(99, "great choice").await.unwrap(); + // Seed topics + topics.add_topic(99, "rust", "Rust language", "rust systems programming").await.unwrap(); + let ctx = retrieve_context(&log, &topics, 99, "rust", 10, 5).await; + assert!(!ctx.history.is_empty(), "expected history entries after seeding"); + assert!(!ctx.atoms.is_empty(), "expected atom entries after seeding"); + } +} diff --git a/_primitives/_rust/kei-buddy/src/serve.rs b/_primitives/_rust/kei-buddy/src/serve.rs index a159661..ada2839 100644 --- a/_primitives/_rust/kei-buddy/src/serve.rs +++ b/_primitives/_rust/kei-buddy/src/serve.rs @@ -15,13 +15,16 @@ use crate::{ chat_log::ChatLog, commands::{execute_command, parse_command, CommandStores}, contacts::Contacts, + conversational::conversational_step, error::BuddyError, extractor::LlmExtractor, machine::handle_step, persona_merge::deep_merge, + retrieval::retrieve_context, serve_telegram::send_message, state::OnboardState, store::{BuddyStore, SqliteBuddyStore}, + strings::Lang, topic_classify::classify_and_store_topic, topics::Topics, voice::VoiceHandler, @@ -87,10 +90,14 @@ impl WebhookContext for BuddyContext async fn on_event(&self, event: WebhookEvent) { match event { WebhookEvent::Text { chat_id, text, .. } => { - self.handle_text(chat_id, text).await; + let me = self.clone(); + tokio::spawn(async move { me.handle_text(chat_id, text).await }); } WebhookEvent::Voice { chat_id, file_id, mime_type, .. } => { - self.handle_voice(chat_id, file_id, mime_type).await; + let me = self.clone(); + tokio::spawn(async move { + me.handle_voice(chat_id, file_id, mime_type).await + }); } other => { warn!(event = ?other, "ignoring unhandled webhook event"); @@ -162,7 +169,13 @@ impl BuddyContext { let state = self.store.load_state(chat_id).await?.unwrap_or(OnboardState::Intro); let was_ready = state == OnboardState::Ready; let persona = self.store.load_persona(chat_id).await?.unwrap_or_else(|| json!({})); - let output = handle_step(&state, text, &persona, self.extractor.as_ref()).await?; + let output = if matches!(state, OnboardState::Intro | OnboardState::AskLanguage) { + handle_step(&state, text, &persona, self.extractor.as_ref()).await? + } else { + let lang = Lang::from_persona(&persona); + let ctx = retrieve_context(&self.chat_log, &self.topics, chat_id, text, 10, 5).await; + conversational_step(&state, &persona, &ctx, text, self.extractor.as_ref(), lang).await? + }; self.store.save_state(chat_id, &output.next_state).await?; self.apply_persona_patch(chat_id, output.persona_patch).await?; if was_ready || output.next_state == OnboardState::Ready {