diff --git a/_primitives/_rust/kei-buddy/Cargo.toml b/_primitives/_rust/kei-buddy/Cargo.toml index 8434ced..992fdf2 100644 --- a/_primitives/_rust/kei-buddy/Cargo.toml +++ b/_primitives/_rust/kei-buddy/Cargo.toml @@ -28,6 +28,8 @@ reqwest = { workspace = true } anyhow = { workspace = true } kei-memory-sqlite = { path = "../kei-memory-sqlite" } kei-chat-store = { path = "../kei-chat-store" } +kei-social-store = { path = "../kei-social-store" } +kei-sage = { path = "../kei-sage" } chrono = { workspace = true } # serve feature deps diff --git a/_primitives/_rust/kei-buddy/src/contacts.rs b/_primitives/_rust/kei-buddy/src/contacts.rs new file mode 100644 index 0000000..01da74c --- /dev/null +++ b/_primitives/_rust/kei-buddy/src/contacts.rs @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: Apache-2.0 +//! `Contacts` — async address-book adapter over `kei-social-store`. +//! Arc> + spawn_blocking pattern (same as chat_log.rs). +//! Never `await` while holding the std::sync::Mutex guard. + +use std::collections::HashSet; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use kei_social_store::{ + graph::Pair, + interactions::{self, Interaction}, + people::{self, Person}, + search::search_people, + Store, +}; + +use crate::error::BuddyError; + +/// Async contact-book backed by `kei-social-store`. +pub struct Contacts { + store: Arc>, +} + +impl Contacts { + /// Open a file-backed contact store at `path`. + pub fn from_path(path: impl AsRef) -> Result { + let store = Store::open(path.as_ref()) + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + Ok(Self { store: Arc::new(Mutex::new(store)) }) + } + + /// Open an in-memory store (useful for tests). + pub fn from_memory() -> Result { + let store = Store::open_memory() + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + Ok(Self { store: Arc::new(Mutex::new(store)) }) + } + + /// Add a contact; returns the new row id. + pub async fn add_contact(&self, p: Person) -> Result { + let store = Arc::clone(&self.store); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + people::add_person(&locked, &p).map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// Retrieve a contact by id; `None` if not found. + pub async fn get_contact(&self, id: i64) -> Result, BuddyError> { + let store = Arc::clone(&self.store); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + people::get_person(&locked, id).map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// Full-text search over contacts. + pub async fn search_contacts(&self, q: &str, limit: i64) -> Result, BuddyError> { + let store = Arc::clone(&self.store); + let q = q.to_string(); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + search_people(&locked, &q, limit).map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// Log a meeting between `person_id` (source) and `target_id` on `channel`. + /// Returns the new interaction row id. + pub async fn log_meet( + &self, + person_id: i64, + target_id: i64, + channel: &str, + note: &str, + ) -> Result { + let store = Arc::clone(&self.store); + let interaction = Interaction { + id: 0, + person_id, + target_id, + interaction_type: "meet".to_string(), + channel: channel.to_string(), + content: note.to_string(), + timestamp: 0, + }; + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + interactions::log_interaction(&locked, &interaction) + .map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// List all interactions where `person_id` is the source. + pub async fn interactions_for(&self, person_id: i64) -> Result, BuddyError> { + let store = Arc::clone(&self.store); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + interactions::interactions_for(&locked, person_id) + .map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// Returns the full relationship graph as `Vec`. + pub async fn relationship_graph(&self) -> Result, BuddyError> { + let store = Arc::clone(&self.store); + tokio::task::spawn_blocking(move || { + let locked = store.lock().expect("store mutex poisoned"); + kei_social_store::graph::relationship_graph(&locked) + .map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? + } + + /// People who interacted with BOTH `person_a` AND `person_b` + /// (appear as `target_id` in pairs for both sources). + pub async fn common_connections( + &self, + person_a: i64, + person_b: i64, + ) -> Result, BuddyError> { + let pairs = self.relationship_graph().await?; + let targets_a: HashSet = pairs + .iter() + .filter(|p| p.person_id == person_a) + .map(|p| p.target_id) + .collect(); + let targets_b: HashSet = pairs + .iter() + .filter(|p| p.person_id == person_b) + .map(|p| p.target_id) + .collect(); + Ok(targets_a.intersection(&targets_b).copied().collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use kei_social_store::Person; + + fn alice() -> Person { Person { name: "Denis".to_string(), email: "d@test.com".to_string(), ..Default::default() } } + fn bob() -> Person { Person { name: "Bob".to_string(), email: "b@test.com".to_string(), ..Default::default() } } + fn charlie() -> Person { Person { name: "Charlie".to_string(), email: "c@test.com".to_string(), ..Default::default() } } + + #[tokio::test] + async fn add_and_get_contact_roundtrip() { + let contacts = Contacts::from_memory().unwrap(); + let id = contacts.add_contact(alice()).await.unwrap(); + assert!(id > 0); + let found = contacts.get_contact(id).await.unwrap(); + assert!(found.is_some()); + assert_eq!(found.unwrap().name, "Denis"); + } + + #[tokio::test] + async fn search_contacts_finds_by_name() { + let contacts = Contacts::from_memory().unwrap(); + contacts.add_contact(alice()).await.unwrap(); + contacts.add_contact(bob()).await.unwrap(); + let results = contacts.search_contacts("Denis", 10).await.unwrap(); + assert!(!results.is_empty()); + assert!(results.iter().any(|p| p.name == "Denis")); + } + + #[tokio::test] + async fn log_meet_and_list_interactions() { + let contacts = Contacts::from_memory().unwrap(); + let a = contacts.add_contact(alice()).await.unwrap(); + let b = contacts.add_contact(bob()).await.unwrap(); + let iid = contacts.log_meet(a, b, "telegram", "hi").await.unwrap(); + assert!(iid > 0); + let list = contacts.interactions_for(a).await.unwrap(); + assert!(!list.is_empty()); + assert_eq!(list[0].channel, "telegram"); + } + + #[tokio::test] + async fn common_connections_finds_shared_target() { + let contacts = Contacts::from_memory().unwrap(); + let a = contacts.add_contact(alice()).await.unwrap(); + let b = contacts.add_contact(bob()).await.unwrap(); + let c = contacts.add_contact(charlie()).await.unwrap(); + contacts.log_meet(a, c, "telegram", "met charlie").await.unwrap(); + contacts.log_meet(b, c, "telegram", "also met charlie").await.unwrap(); + let common = contacts.common_connections(a, b).await.unwrap(); + assert!(common.contains(&c)); + } +} diff --git a/_primitives/_rust/kei-buddy/src/lib.rs b/_primitives/_rust/kei-buddy/src/lib.rs index 54db0e2..34ad663 100644 --- a/_primitives/_rust/kei-buddy/src/lib.rs +++ b/_primitives/_rust/kei-buddy/src/lib.rs @@ -11,6 +11,7 @@ //! * `store` — `BuddyStore` trait + `SqliteBuddyStore` impl pub mod chat_log; +pub mod contacts; pub mod error; pub mod extractor; pub mod machine; @@ -20,6 +21,7 @@ pub mod schema; pub mod state; pub mod store; pub(crate) mod store_ops; +pub mod topics; pub mod transition; #[cfg(feature = "serve")] @@ -28,9 +30,11 @@ pub mod serve; pub mod serve_telegram; pub use chat_log::ChatLog; +pub use contacts::Contacts; pub use error::BuddyError; pub use extractor::LlmExtractor; pub use machine::handle_step; pub use state::OnboardState; pub use store::{BuddyStore, SqliteBuddyStore}; +pub use topics::Topics; pub use transition::StepOutput; diff --git a/_primitives/_rust/kei-buddy/src/topics.rs b/_primitives/_rust/kei-buddy/src/topics.rs new file mode 100644 index 0000000..9605443 --- /dev/null +++ b/_primitives/_rust/kei-buddy/src/topics.rs @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: Apache-2.0 +//! `Topics` — async adapter storing topics + digests in kei-sage. +//! Constructor Pattern: one responsibility — bridge kei-buddy to the +//! kei-sage knowledge vault. All rusqlite calls via `spawn_blocking`. + +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use kei_sage::{ + edges::{add_edge, list_outgoing}, + search::fts_search, + store::Store, + Unit, +}; +use rusqlite::params; + +use crate::error::BuddyError; +/// Wraps kei-sage `Store` with buddy-domain topic/digest API. +pub struct Topics { + store: Arc>, +} + +impl Topics { + pub fn from_path(path: impl AsRef) -> Result { + let store = Store::open(path.as_ref()) + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + Ok(Self { store: Arc::new(Mutex::new(store)) }) + } + + pub fn from_memory() -> Result { + let store = Store::open_memory() + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + Ok(Self { store: Arc::new(Mutex::new(store)) }) + } + + /// Add a topic; idempotent by `source_path`. Returns unit id. + pub async fn add_topic( + &self, chat_id: i64, slug: &str, title: &str, content: &str, + ) -> Result { + let src = format!("kei-buddy/chat-{chat_id}/topic/{slug}"); + let unit = make_unit("buddy_topic", title, content, "", &src); + let store = Arc::clone(&self.store); + spawn(move || find_or_add(&store.lock().expect("poisoned"), &unit)).await + } + + /// Add a digest note linked to a topic. Returns digest unit id. + pub async fn add_digest( + &self, chat_id: i64, topic_slug: &str, timestamp: i64, content: &str, + ) -> Result { + let topic_path = format!("kei-buddy/chat-{chat_id}/topic/{topic_slug}"); + let dst = format!("kei-buddy/chat-{chat_id}/digest/{timestamp}"); + let unit = make_unit("buddy_digest", &format!("digest-{timestamp}"), content, "E3", &dst); + let store = Arc::clone(&self.store); + spawn(move || { + let locked = store.lock().expect("poisoned"); + let id = find_or_add(&locked, &unit)?; + add_edge(&locked, &topic_path, &dst, "digest_for", 1.0) + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + Ok(id) + }) + .await + } + + /// Full-text search across all kei-buddy units. + pub async fn search(&self, q: &str, limit: i64) -> Result, BuddyError> { + let store = Arc::clone(&self.store); + let q = q.to_string(); + spawn(move || { + fts_search(&store.lock().expect("poisoned"), &q, limit) + .map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + } + + /// List digest units linked from a topic via "digest_for" edges. + pub async fn digests_for(&self, chat_id: i64, slug: &str) -> Result, BuddyError> { + let topic_path = format!("kei-buddy/chat-{chat_id}/topic/{slug}"); + let store = Arc::clone(&self.store); + spawn(move || { + let locked = store.lock().expect("poisoned"); + let edges = list_outgoing(&locked, &topic_path) + .map_err(|e| BuddyError::Memory(format!("{e}")))?; + let mut out = Vec::new(); + for e in edges.into_iter().filter(|e| e.edge_type == "digest_for") { + if let Some(u) = unit_by_path(&locked, &e.dst_path) + .map_err(|e| BuddyError::Memory(format!("{e}")))? + { + out.push(u); + } + } + Ok(out) + }) + .await + } + + /// List all topic units for a chat via raw SELECT. + pub async fn list_topics(&self, chat_id: i64) -> Result, BuddyError> { + let prefix = format!("kei-buddy/chat-{chat_id}/topic/%"); + let store = Arc::clone(&self.store); + spawn(move || { + topics_by_prefix(&store.lock().expect("poisoned"), &prefix) + .map_err(|e| BuddyError::Memory(format!("{e}"))) + }) + .await + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── +fn make_unit(unit_type: &str, title: &str, content: &str, grade: &str, path: &str) -> Unit { + Unit { + id: 0, unit_type: unit_type.to_string(), title: title.to_string(), + content: content.to_string(), evidence_grade: grade.to_string(), + source_path: path.to_string(), vault_path: path.to_string(), + category: "kei-buddy".to_string(), created_at: 0, updated_at: 0, + } +} + +fn find_or_add(store: &Store, unit: &Unit) -> Result { + let existing: Option = store.conn() + .query_row("SELECT id FROM knowledge_units WHERE source_path=?1 LIMIT 1", + params![unit.source_path], |r| r.get(0)) + .ok(); + if let Some(id) = existing { return Ok(id); } + store.add_unit(unit).map_err(|e| BuddyError::Memory(format!("{e}"))) +} + +fn unit_by_path(store: &Store, src: &str) -> anyhow::Result> { + let mut stmt = store.conn().prepare( + "SELECT id,unit_type,title,content,evidence_grade,source_path,vault_path, + category,created_at,updated_at FROM knowledge_units WHERE source_path=?1 LIMIT 1")?; + let mut rows = stmt.query(params![src])?; + if let Some(r) = rows.next()? { return Ok(Some(row_to_unit(r)?)); } + Ok(None) +} + +fn topics_by_prefix(store: &Store, prefix: &str) -> anyhow::Result> { + let mut stmt = store.conn().prepare( + "SELECT id,unit_type,title,content,evidence_grade,source_path,vault_path, + category,created_at,updated_at FROM knowledge_units + WHERE category='kei-buddy' AND unit_type='buddy_topic' AND source_path LIKE ?1")?; + let rows = stmt.query_map(params![prefix], row_to_unit)?; + let mut out = Vec::new(); + for r in rows { out.push(r?); } + Ok(out) +} + +fn row_to_unit(r: &rusqlite::Row) -> rusqlite::Result { + Ok(Unit { id: r.get(0)?, unit_type: r.get(1)?, title: r.get(2)?, + content: r.get(3)?, evidence_grade: r.get(4)?, source_path: r.get(5)?, + vault_path: r.get(6)?, category: r.get(7)?, created_at: r.get(8)?, updated_at: r.get(9)? }) +} + +/// Thin wrapper: run closure in `spawn_blocking`, flatten join error. +async fn spawn(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + tokio::task::spawn_blocking(f) + .await + .map_err(|e| BuddyError::Memory(format!("spawn_blocking join: {e}")))? +} + +// ── Tests ───────────────────────────────────────────────────────────────────── +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn add_topic_then_search_finds_it() { + let t = Topics::from_memory().unwrap(); + t.add_topic(42, "ml", "ML Concepts", "talk about ml").await.unwrap(); + let res = t.search("ml", 10).await.unwrap(); + assert!(!res.is_empty()); + } + + #[tokio::test] + async fn add_topic_is_idempotent() { + let t = Topics::from_memory().unwrap(); + t.add_topic(42, "ml", "ML Concepts", "first").await.unwrap(); + t.add_topic(42, "ml", "ML Concepts", "second").await.unwrap(); + assert_eq!(t.list_topics(42).await.unwrap().len(), 1); + } + + #[tokio::test] + async fn add_digest_creates_edge_and_dest() { + let t = Topics::from_memory().unwrap(); + t.add_topic(42, "ml", "ML Concepts", "about ml").await.unwrap(); + t.add_digest(42, "ml", 1_000_000, "digest content").await.unwrap(); + assert_eq!(t.digests_for(42, "ml").await.unwrap().len(), 1); + } + + #[tokio::test] + async fn list_topics_scopes_per_chat() { + let t = Topics::from_memory().unwrap(); + t.add_topic(1, "rust", "Rust", "rust stuff").await.unwrap(); + t.add_topic(2, "go", "Go", "go stuff").await.unwrap(); + assert_eq!(t.list_topics(1).await.unwrap().len(), 1); + } +}