diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index 40fad4b..5fe3574 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -656,6 +656,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.1" @@ -912,6 +918,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow 0.6.26", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -1182,6 +1199,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -1307,7 +1333,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags", + "bitflags 2.11.1", "libc", "libgit2-sys", "log", @@ -1770,6 +1796,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.11.1", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -2014,6 +2060,14 @@ dependencies = [ "tempfile", ] +[[package]] +name = "kei-diff" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "kei-entity-store" version = "0.1.0" @@ -2189,6 +2243,22 @@ dependencies = [ "walkdir", ] +[[package]] +name = "kei-scheduler" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "cron", + "kei-entity-store", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 1.0.69", +] + [[package]] name = "kei-search-core" version = "0.1.0" @@ -2261,6 +2331,17 @@ dependencies = [ "tempfile", ] +[[package]] +name = "kei-watch" +version = "0.1.0" +dependencies = [ + "clap", + "notify", + "serde", + "serde_json", + "tempfile", +] + [[package]] name = "keisei" version = "0.1.0" @@ -2276,6 +2357,26 @@ dependencies = [ "toml", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2321,7 +2422,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" dependencies = [ - "bitflags", + "bitflags 2.11.1", "libc", "plain", "redox_syscall 0.7.4", @@ -2437,6 +2538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -2480,6 +2582,33 @@ dependencies = [ "memchr", ] +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.11.1", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.11.1", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2716,7 +2845,7 @@ version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60769b8b31b2a9f263dae2776c37b1b28ae246943cf719eb6946a1db05128a61" dependencies = [ - "bitflags", + "bitflags 2.11.1", "crc32fast", "fdeflate", "flate2", @@ -2858,7 +2987,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.1", ] [[package]] @@ -2867,7 +2996,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a" dependencies = [ - "bitflags", + "bitflags 2.11.1", ] [[package]] @@ -2945,7 +3074,7 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" dependencies = [ - "bitflags", + "bitflags 2.11.1", "fallible-iterator", "fallible-streaming-iterator", "hashlink", @@ -2968,7 +3097,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys", @@ -3105,7 +3234,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags", + "bitflags 2.11.1", "core-foundation", "core-foundation-sys", "libc", @@ -3475,7 +3604,7 @@ checksum = "5afe4c38a9b417b6a9a5eeffe7235d0a106716495536e7727d1c7f4b1ff3eba6" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.11.1", "byteorder", "bytes", "crc", @@ -3517,7 +3646,7 @@ checksum = "b1dbb157e65f10dbe01f729339c06d239120221c9ad9fa0ba8408c4cc18ecf21" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.11.1", "byteorder", "crc", "dotenvy", @@ -3867,7 +3996,7 @@ dependencies = [ "serde_spanned", "toml_datetime", "toml_write", - "winnow", + "winnow 0.7.15", ] [[package]] @@ -4220,7 +4349,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.1", "hashbrown 0.15.5", "indexmap", "semver", @@ -4328,6 +4457,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -4361,13 +4499,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -4380,6 +4535,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -4392,6 +4553,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -4404,12 +4571,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -4422,6 +4601,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -4434,6 +4619,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -4446,6 +4637,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -4458,6 +4655,21 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.7.15" @@ -4531,7 +4743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.1", "indexmap", "log", "serde", diff --git a/_primitives/_rust/Cargo.toml b/_primitives/_rust/Cargo.toml index 0eac20d..65d698b 100644 --- a/_primitives/_rust/Cargo.toml +++ b/_primitives/_rust/Cargo.toml @@ -51,6 +51,12 @@ members = [ "kei-spawn", # agent substrate v1 — reconstruct spawn from DNA (ledger row + task.toml + recompose) "kei-replay", + # v0.29 Wave 13 — structural JSON diff primitive (RFC 6902 subset add/remove/replace) + "kei-diff", + # v0.29 Wave 13 — durable task scheduler (cron / at / interval) metadata primitive + "kei-scheduler", + # v0.29 Wave 13 — filesystem watcher primitive (thin notify wrapper, sync API) + "kei-watch", ] [workspace.package] diff --git a/_primitives/_rust/kei-diff/Cargo.toml b/_primitives/_rust/kei-diff/Cargo.toml new file mode 100644 index 0000000..5fe2c6e --- /dev/null +++ b/_primitives/_rust/kei-diff/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "kei-diff" +version = "0.1.0" +edition = "2021" +rust-version = "1.75" +description = "Structural JSON diff (RFC 6902 subset: add/remove/replace). Pure computation primitive for drift detection in kei-replay and invalidation in kei-cache." + +[lib] +name = "kei_diff" +path = "src/lib.rs" + +[[bin]] +name = "kei-diff" +path = "src/main.rs" + +[dependencies] +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/_primitives/_rust/kei-diff/src/apply.rs b/_primitives/_rust/kei-diff/src/apply.rs new file mode 100644 index 0000000..7936192 --- /dev/null +++ b/_primitives/_rust/kei-diff/src/apply.rs @@ -0,0 +1,169 @@ +//! Apply an RFC 6902 patch (add/remove/replace subset) to a JSON document. +//! +//! Root-path `""` replace swaps the entire document. Array `add` with +//! index == len (or `-`) appends; in-range index inserts and shifts. +//! Array `remove` deletes and shifts. Object ops insert/delete/replace keys. + +use crate::apply_error::ApplyError; +use crate::op::{Op, Patch}; +use crate::path::parse_pointer; +use serde_json::Value; + +/// Apply `patch` to `root` and return a new `Value`. `root` is cloned; +/// the original is untouched. Operations are applied in order. +pub fn apply(root: &Value, patch: &Patch) -> Result { + let mut doc = root.clone(); + for op in &patch.0 { + apply_one(&mut doc, op)?; + } + Ok(doc) +} + +fn apply_one(doc: &mut Value, op: &Op) -> Result<(), ApplyError> { + match op { + Op::Add { path, value } => apply_add(doc, path, value.clone()), + Op::Remove { path } => apply_remove(doc, path).map(|_| ()), + Op::Replace { path, value } => apply_replace(doc, path, value.clone()), + } +} + +fn apply_add(doc: &mut Value, path: &str, value: Value) -> Result<(), ApplyError> { + let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?; + if segs.is_empty() { + return Err(ApplyError::CannotAddToRoot); + } + let (parent_segs, last) = segs.split_at(segs.len() - 1); + let parent = navigate_mut(doc, parent_segs, path)?; + insert_into(parent, &last[0], value, path) +} + +fn apply_remove(doc: &mut Value, path: &str) -> Result { + let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?; + if segs.is_empty() { + return Err(ApplyError::CannotRemoveRoot); + } + let (parent_segs, last) = segs.split_at(segs.len() - 1); + let parent = navigate_mut(doc, parent_segs, path)?; + remove_from(parent, &last[0], path) +} + +fn apply_replace(doc: &mut Value, path: &str, value: Value) -> Result<(), ApplyError> { + let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?; + if segs.is_empty() { + *doc = value; + return Ok(()); + } + let (parent_segs, last) = segs.split_at(segs.len() - 1); + let parent = navigate_mut(doc, parent_segs, path)?; + replace_in(parent, &last[0], value, path) +} + +fn navigate_mut<'a>( + mut cur: &'a mut Value, + segs: &[String], + full_path: &str, +) -> Result<&'a mut Value, ApplyError> { + for seg in segs { + cur = step_into(cur, seg, full_path)?; + } + Ok(cur) +} + +fn step_into<'a>( + cur: &'a mut Value, + seg: &str, + full_path: &str, +) -> Result<&'a mut Value, ApplyError> { + match cur { + Value::Object(map) => map + .get_mut(seg) + .ok_or_else(|| ApplyError::MissingParent(full_path.into())), + Value::Array(arr) => { + let idx = parse_index(seg, full_path)?; + arr.get_mut(idx) + .ok_or_else(|| ApplyError::MissingParent(full_path.into())) + } + _ => Err(ApplyError::TypeMismatch { + path: full_path.into(), + expected: "object or array", + }), + } +} + +fn insert_into(parent: &mut Value, key: &str, value: Value, full: &str) -> Result<(), ApplyError> { + match parent { + Value::Object(map) => { + map.insert(key.to_string(), value); + Ok(()) + } + Value::Array(arr) => { + let idx = parse_array_insert_index(key, arr.len(), full)?; + arr.insert(idx, value); + Ok(()) + } + _ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }), + } +} + +fn remove_from(parent: &mut Value, key: &str, full: &str) -> Result { + match parent { + Value::Object(map) => map + .remove(key) + .ok_or_else(|| ApplyError::MissingTarget(full.into())), + Value::Array(arr) => { + let idx = parse_index(key, full)?; + if idx >= arr.len() { + return Err(ApplyError::IndexOutOfBounds { + path: full.into(), + index: idx, + len: arr.len(), + }); + } + Ok(arr.remove(idx)) + } + _ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }), + } +} + +fn replace_in(parent: &mut Value, key: &str, value: Value, full: &str) -> Result<(), ApplyError> { + match parent { + Value::Object(map) => { + if !map.contains_key(key) { + return Err(ApplyError::MissingTarget(full.into())); + } + map.insert(key.to_string(), value); + Ok(()) + } + Value::Array(arr) => { + let idx = parse_index(key, full)?; + if idx >= arr.len() { + return Err(ApplyError::IndexOutOfBounds { + path: full.into(), + index: idx, + len: arr.len(), + }); + } + arr[idx] = value; + Ok(()) + } + _ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }), + } +} + +fn parse_index(seg: &str, full: &str) -> Result { + seg.parse::() + .map_err(|_| ApplyError::InvalidPointer(full.into())) +} + +fn parse_array_insert_index(seg: &str, len: usize, full: &str) -> Result { + if seg == "-" { + return Ok(len); + } + let idx = seg + .parse::() + .map_err(|_| ApplyError::InvalidPointer(full.into()))?; + if idx > len { + return Err(ApplyError::IndexOutOfBounds { path: full.into(), index: idx, len }); + } + Ok(idx) +} diff --git a/_primitives/_rust/kei-diff/src/apply_error.rs b/_primitives/_rust/kei-diff/src/apply_error.rs new file mode 100644 index 0000000..7936d60 --- /dev/null +++ b/_primitives/_rust/kei-diff/src/apply_error.rs @@ -0,0 +1,37 @@ +//! `ApplyError` — structured failure reasons for `apply()`. +//! +//! Kept in its own module so `apply.rs` stays focused on the algorithm +//! and each file stays within Constructor Pattern limits. + +use std::fmt; + +#[derive(Debug, Clone, PartialEq)] +pub enum ApplyError { + InvalidPointer(String), + MissingParent(String), + MissingTarget(String), + IndexOutOfBounds { path: String, index: usize, len: usize }, + TypeMismatch { path: String, expected: &'static str }, + CannotAddToRoot, + CannotRemoveRoot, +} + +impl fmt::Display for ApplyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidPointer(p) => write!(f, "invalid JSON pointer: {p:?}"), + Self::MissingParent(p) => write!(f, "missing parent at {p:?}"), + Self::MissingTarget(p) => write!(f, "missing target at {p:?}"), + Self::IndexOutOfBounds { path, index, len } => { + write!(f, "index {index} out of bounds (len {len}) at {path:?}") + } + Self::TypeMismatch { path, expected } => { + write!(f, "type mismatch at {path:?}: expected {expected}") + } + Self::CannotAddToRoot => write!(f, "cannot 'add' to root (use 'replace')"), + Self::CannotRemoveRoot => write!(f, "cannot 'remove' root"), + } + } +} + +impl std::error::Error for ApplyError {} diff --git a/_primitives/_rust/kei-diff/src/diff.rs b/_primitives/_rust/kei-diff/src/diff.rs new file mode 100644 index 0000000..723eaa1 --- /dev/null +++ b/_primitives/_rust/kei-diff/src/diff.rs @@ -0,0 +1,106 @@ +//! Structural JSON diff. +//! +//! Algorithm: +//! * Both objects → recurse per-key across the union (add/remove/recurse). +//! * Both arrays → index-based (recurse on overlap; add-tail or remove-tail +//! for length delta). NOT LCS — simpler, idempotent enough for drift +//! detection, and cheap (O(n)). +//! * Otherwise, if values differ → `replace`. +//! * Equal values → no-op. +//! +//! Rationale for skipping LCS: the consumer (kei-replay drift check) cares +//! about "does anything differ" and "at which logical coordinate", not +//! minimum-edit-distance. Index-based gives stable paths; LCS would produce +//! a smaller patch on shuffled arrays but with ambiguous paths. + +use crate::op::{Op, Patch}; +use crate::path::PathBuf; +use serde_json::Value; + +/// Compute an RFC 6902 subset patch that transforms `old` into `new`. +/// Invariant: `apply(old, diff(old, new)) == new`. +pub fn diff(old: &Value, new: &Value) -> Patch { + let mut patch = Patch::new(); + let mut path = PathBuf::new(); + diff_recurse(old, new, &mut path, &mut patch); + patch +} + +fn diff_recurse(old: &Value, new: &Value, path: &mut PathBuf, patch: &mut Patch) { + if old == new { + return; + } + match (old, new) { + (Value::Object(a), Value::Object(b)) => diff_objects(a, b, path, patch), + (Value::Array(a), Value::Array(b)) => diff_arrays(a, b, path, patch), + _ => patch.push(Op::Replace { + path: path.as_string(), + value: new.clone(), + }), + } +} + +fn diff_objects( + a: &serde_json::Map, + b: &serde_json::Map, + path: &mut PathBuf, + patch: &mut Patch, +) { + // Removals: keys in `a` but not `b`. Emit in stable key order for determinism. + for key in a.keys() { + if !b.contains_key(key) { + path.push_key(key); + patch.push(Op::Remove { path: path.as_string() }); + path.pop(); + } + } + // Additions + recursion: iterate `b` in its key order. + for (key, b_val) in b { + path.push_key(key); + match a.get(key) { + None => patch.push(Op::Add { + path: path.as_string(), + value: b_val.clone(), + }), + Some(a_val) => diff_recurse(a_val, b_val, path, patch), + } + path.pop(); + } +} + +fn diff_arrays(a: &[Value], b: &[Value], path: &mut PathBuf, patch: &mut Patch) { + let common = a.len().min(b.len()); + // Recurse on overlapping prefix. + for i in 0..common { + path.push_index(i); + diff_recurse(&a[i], &b[i], path, patch); + path.pop(); + } + if a.len() > b.len() { + emit_array_truncate(a.len(), b.len(), path, patch); + } else if b.len() > a.len() { + emit_array_append(b, a.len(), path, patch); + } +} + +// Remove trailing indices highest-first so surviving indices don't shift. +fn emit_array_truncate(old_len: usize, new_len: usize, path: &mut PathBuf, patch: &mut Patch) { + for i in (new_len..old_len).rev() { + path.push_index(i); + patch.push(Op::Remove { path: path.as_string() }); + path.pop(); + } +} + +// Append new tail. Emit in ascending order so each add references the +// just-created length as the next insertion point. +fn emit_array_append(b: &[Value], old_len: usize, path: &mut PathBuf, patch: &mut Patch) { + for i in old_len..b.len() { + path.push_index(i); + patch.push(Op::Add { + path: path.as_string(), + value: b[i].clone(), + }); + path.pop(); + } +} diff --git a/_primitives/_rust/kei-diff/src/lib.rs b/_primitives/_rust/kei-diff/src/lib.rs new file mode 100644 index 0000000..5200063 --- /dev/null +++ b/_primitives/_rust/kei-diff/src/lib.rs @@ -0,0 +1,22 @@ +//! kei-diff — structural JSON diff (RFC 6902 subset: add/remove/replace). +//! +//! ## Design +//! * Emits ONLY `add`, `remove`, `replace`. No `copy`/`move`/`test`. +//! * Arrays diffed by index (not LCS) — matches drift-detection semantics. +//! * Paths are RFC 6901 JSON Pointers (`~` → `~0`, `/` → `~1`). +//! * Correctness invariant: `apply(old, diff(old, new)) == new`. +//! +//! Consumed by `kei-replay` (drift detection between DNA-scoped agent runs) +//! and `kei-cache` (invalidation signals). Pure compute, zero sibling deps. + +mod apply; +mod apply_error; +mod diff; +mod op; +mod path; + +pub use apply::apply; +pub use apply_error::ApplyError; +pub use diff::diff; +pub use op::{Op, Patch}; +pub use path::PathBuf as PointerBuf; diff --git a/_primitives/_rust/kei-diff/src/main.rs b/_primitives/_rust/kei-diff/src/main.rs new file mode 100644 index 0000000..401f2ee --- /dev/null +++ b/_primitives/_rust/kei-diff/src/main.rs @@ -0,0 +1,80 @@ +//! kei-diff CLI. +//! +//! Usage: +//! kei-diff diff --old --new # prints RFC 6902 patch +//! kei-diff apply --base --patch # prints result document +//! +//! No external arg-parser dep — this is a two-verb tool with fixed flag sets, +//! hand-rolling keeps the crate zero-dep beyond serde/serde_json. + +use std::process::ExitCode; + +fn main() -> ExitCode { + let args: Vec = std::env::args().skip(1).collect(); + match run(&args) { + Ok(out) => { + println!("{out}"); + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("kei-diff: {e}"); + ExitCode::FAILURE + } + } +} + +fn run(args: &[String]) -> Result { + match args.first().map(String::as_str) { + Some("diff") => cmd_diff(&args[1..]), + Some("apply") => cmd_apply(&args[1..]), + Some("help") | Some("--help") | Some("-h") | None => Ok(usage()), + Some(other) => Err(format!("unknown subcommand {other:?}\n{}", usage())), + } +} + +fn cmd_diff(args: &[String]) -> Result { + let old_path = flag(args, "--old")?; + let new_path = flag(args, "--new")?; + let old = read_json(&old_path)?; + let new = read_json(&new_path)?; + let patch = kei_diff::diff(&old, &new); + serde_json::to_string_pretty(&patch).map_err(|e| e.to_string()) +} + +fn cmd_apply(args: &[String]) -> Result { + let base_path = flag(args, "--base")?; + let patch_path = flag(args, "--patch")?; + let base = read_json(&base_path)?; + let patch_json = std::fs::read_to_string(&patch_path) + .map_err(|e| format!("read {patch_path}: {e}"))?; + let patch: kei_diff::Patch = serde_json::from_str(&patch_json) + .map_err(|e| format!("parse patch: {e}"))?; + let out = kei_diff::apply(&base, &patch).map_err(|e| e.to_string())?; + serde_json::to_string_pretty(&out).map_err(|e| e.to_string()) +} + +fn flag(args: &[String], name: &str) -> Result { + let mut iter = args.iter(); + while let Some(a) = iter.next() { + if a == name { + return iter + .next() + .cloned() + .ok_or_else(|| format!("flag {name} requires a value")); + } + } + Err(format!("missing required flag {name}")) +} + +fn read_json(path: &str) -> Result { + let txt = std::fs::read_to_string(path).map_err(|e| format!("read {path}: {e}"))?; + serde_json::from_str(&txt).map_err(|e| format!("parse {path}: {e}")) +} + +fn usage() -> String { + "kei-diff — structural JSON diff (RFC 6902 add/remove/replace)\n\n\ + USAGE:\n \ + kei-diff diff --old --new \n \ + kei-diff apply --base --patch \n" + .to_string() +} diff --git a/_primitives/_rust/kei-diff/src/op.rs b/_primitives/_rust/kei-diff/src/op.rs new file mode 100644 index 0000000..be0e555 --- /dev/null +++ b/_primitives/_rust/kei-diff/src/op.rs @@ -0,0 +1,134 @@ +//! Patch operation types + RFC 6902 JSON serialization. +//! +//! We emit only the minimal trio (`add`, `remove`, `replace`). Custom Serialize +//! keeps the wire format stable and self-documenting (no need for serde tag +//! gymnastics). + +use serde::de::{self, MapAccess, Visitor}; +use serde::ser::{SerializeMap, SerializeSeq}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde_json::Value; +use std::fmt; + +/// A single RFC 6902 patch operation (subset). +#[derive(Debug, Clone, PartialEq)] +pub enum Op { + Add { path: String, value: Value }, + Remove { path: String }, + Replace { path: String, value: Value }, +} + +/// An ordered list of `Op`s. Serializes as a JSON array per RFC 6902. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Patch(pub Vec); + +impl Patch { + pub fn new() -> Self { + Self(Vec::new()) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn push(&mut self, op: Op) { + self.0.push(op); + } + + pub fn len(&self) -> usize { + self.0.len() + } +} + +impl Serialize for Op { + fn serialize(&self, s: S) -> Result { + match self { + Op::Add { path, value } => { + let mut m = s.serialize_map(Some(3))?; + m.serialize_entry("op", "add")?; + m.serialize_entry("path", path)?; + m.serialize_entry("value", value)?; + m.end() + } + Op::Remove { path } => { + let mut m = s.serialize_map(Some(2))?; + m.serialize_entry("op", "remove")?; + m.serialize_entry("path", path)?; + m.end() + } + Op::Replace { path, value } => { + let mut m = s.serialize_map(Some(3))?; + m.serialize_entry("op", "replace")?; + m.serialize_entry("path", path)?; + m.serialize_entry("value", value)?; + m.end() + } + } + } +} + +impl Serialize for Patch { + fn serialize(&self, s: S) -> Result { + let mut seq = s.serialize_seq(Some(self.0.len()))?; + for op in &self.0 { + seq.serialize_element(op)?; + } + seq.end() + } +} + +impl<'de> Deserialize<'de> for Op { + fn deserialize>(d: D) -> Result { + d.deserialize_map(OpVisitor) + } +} + +impl<'de> Deserialize<'de> for Patch { + fn deserialize>(d: D) -> Result { + Vec::::deserialize(d).map(Patch) + } +} + +struct OpVisitor; + +impl<'de> Visitor<'de> for OpVisitor { + type Value = Op; + + fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("an RFC 6902 JSON Patch operation object") + } + + fn visit_map>(self, mut map: M) -> Result { + let mut op: Option = None; + let mut path: Option = None; + let mut value: Option = None; + while let Some(key) = map.next_key::()? { + match key.as_str() { + "op" => op = Some(map.next_value()?), + "path" => path = Some(map.next_value()?), + "value" => value = Some(map.next_value()?), + _ => { + let _: serde::de::IgnoredAny = map.next_value()?; + } + } + } + let op = op.ok_or_else(|| de::Error::missing_field("op"))?; + let path = path.ok_or_else(|| de::Error::missing_field("path"))?; + build_op::(&op, path, value) + } +} + +fn build_op(op: &str, path: String, value: Option) -> Result { + match op { + "add" => Ok(Op::Add { + path, + value: value.ok_or_else(|| E::missing_field("value"))?, + }), + "remove" => Ok(Op::Remove { path }), + "replace" => Ok(Op::Replace { + path, + value: value.ok_or_else(|| E::missing_field("value"))?, + }), + other => Err(E::unknown_variant(other, &["add", "remove", "replace"])), + } +} diff --git a/_primitives/_rust/kei-diff/src/path.rs b/_primitives/_rust/kei-diff/src/path.rs new file mode 100644 index 0000000..8f622bc --- /dev/null +++ b/_primitives/_rust/kei-diff/src/path.rs @@ -0,0 +1,105 @@ +//! RFC 6901 JSON Pointer path builder. +//! +//! Root is `""`. Segments join with `/`. Inside a segment, `~` encodes as +//! `~0` and `/` encodes as `~1`. Order matters: `~` must be escaped first +//! when encoding, and `~1` must be decoded before `~0`. + +/// Incremental pointer builder. Use `push`/`pop` during recursive traversal; +/// `as_str` yields the current RFC 6901 pointer. +#[derive(Debug, Default, Clone)] +pub struct PathBuf { + segments: Vec, // already-encoded segments (no leading '/') +} + +impl PathBuf { + pub fn new() -> Self { + Self { segments: Vec::new() } + } + + /// Push an object key. Performs RFC 6901 escaping. + pub fn push_key(&mut self, key: &str) { + self.segments.push(encode_segment(key)); + } + + /// Push an array index. Always emitted as decimal digits. + pub fn push_index(&mut self, idx: usize) { + self.segments.push(idx.to_string()); + } + + pub fn pop(&mut self) { + self.segments.pop(); + } + + /// Current pointer as a String. Empty string if at root. + pub fn as_string(&self) -> String { + if self.segments.is_empty() { + return String::new(); + } + let mut out = String::with_capacity(self.segments.iter().map(|s| s.len() + 1).sum()); + for seg in &self.segments { + out.push('/'); + out.push_str(seg); + } + out + } +} + +fn encode_segment(raw: &str) -> String { + // ~ must be escaped BEFORE / so we don't double-encode. + raw.replace('~', "~0").replace('/', "~1") +} + +/// Parse an RFC 6901 pointer into decoded segments. `""` → `[]`. +/// Returns `None` if pointer is malformed (e.g. doesn't start with `/` +/// and is non-empty). +pub fn parse_pointer(ptr: &str) -> Option> { + if ptr.is_empty() { + return Some(Vec::new()); + } + if !ptr.starts_with('/') { + return None; + } + let segs = ptr[1..] + .split('/') + .map(decode_segment) + .collect::>(); + Some(segs) +} + +fn decode_segment(raw: &str) -> String { + // ~1 must be decoded BEFORE ~0 per RFC 6901. + raw.replace("~1", "/").replace("~0", "~") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn root_is_empty_string() { + let p = PathBuf::new(); + assert_eq!(p.as_string(), ""); + } + + #[test] + fn escape_tilde_and_slash() { + let mut p = PathBuf::new(); + p.push_key("a/b"); + p.push_key("c~d"); + assert_eq!(p.as_string(), "/a~1b/c~0d"); + } + + #[test] + fn roundtrip_encode_decode() { + let mut p = PathBuf::new(); + p.push_key("weird~/key"); + let s = p.as_string(); + let decoded = parse_pointer(&s).unwrap(); + assert_eq!(decoded, vec!["weird~/key".to_string()]); + } + + #[test] + fn parse_rejects_malformed() { + assert!(parse_pointer("no-leading-slash").is_none()); + } +} diff --git a/_primitives/_rust/kei-diff/tests/round_trip.rs b/_primitives/_rust/kei-diff/tests/round_trip.rs new file mode 100644 index 0000000..b85274d --- /dev/null +++ b/_primitives/_rust/kei-diff/tests/round_trip.rs @@ -0,0 +1,278 @@ +//! Integration tests for kei-diff. +//! +//! Core property: `apply(old, diff(old, new)) == new` for every fixture. +//! Plus edge cases on pointer escaping, array edits, apply errors, and +//! the RFC 6902 wire format. + +use kei_diff::{apply, diff, ApplyError, Op, Patch}; +use serde_json::{json, Value}; + +fn rt(old: Value, new: Value) { + let patch = diff(&old, &new); + let applied = apply(&old, &patch).expect("apply failed"); + assert_eq!( + applied, new, + "round-trip failed\n old = {old}\n new = {new}\n patch = {}", + serde_json::to_string(&patch).unwrap() + ); +} + +#[test] +fn equal_values_produce_empty_patch() { + let patch = diff(&json!({"a": 1}), &json!({"a": 1})); + assert!(patch.is_empty()); + rt(json!({"a": 1}), json!({"a": 1})); +} + +#[test] +fn scalar_replace() { + rt(json!(1), json!(2)); + rt(json!("a"), json!("b")); + rt(json!(true), json!(false)); +} + +#[test] +fn type_change_emits_replace() { + let old = json!("hello"); + let new = json!(42); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(patch.0[0], Op::Replace { .. })); + rt(old, new); +} + +#[test] +fn object_add_key() { + let old = json!({"a": 1}); + let new = json!({"a": 1, "b": 2}); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Add { path, .. } if path == "/b")); + rt(old, new); +} + +#[test] +fn object_remove_key() { + let old = json!({"a": 1, "b": 2}); + let new = json!({"a": 1}); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Remove { path } if path == "/b")); + rt(old, new); +} + +#[test] +fn object_replace_value() { + let old = json!({"a": 1}); + let new = json!({"a": 2}); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/a")); + rt(old, new); +} + +#[test] +fn nested_object_replace() { + let old = json!({"a": {"b": {"c": 1}}}); + let new = json!({"a": {"b": {"c": 2}}}); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/a/b/c")); + rt(old, new); +} + +#[test] +fn array_append() { + let old = json!([1, 2]); + let new = json!([1, 2, 3, 4]); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 2); + assert!(matches!(&patch.0[0], Op::Add { path, .. } if path == "/2")); + assert!(matches!(&patch.0[1], Op::Add { path, .. } if path == "/3")); + rt(old, new); +} + +#[test] +fn array_truncate() { + let old = json!([1, 2, 3, 4]); + let new = json!([1, 2]); + let patch = diff(&old, &new); + // Expect removals highest-first (/3 then /2) so indices stay valid. + assert_eq!(patch.len(), 2); + assert!(matches!(&patch.0[0], Op::Remove { path } if path == "/3")); + assert!(matches!(&patch.0[1], Op::Remove { path } if path == "/2")); + rt(old, new); +} + +#[test] +fn array_element_replace() { + let old = json!([1, 2, 3]); + let new = json!([1, 99, 3]); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/1")); + rt(old, new); +} + +#[test] +fn nested_array_inside_object() { + let old = json!({"xs": [1, 2, 3], "y": "z"}); + let new = json!({"xs": [1, 7, 3, 4], "y": "z"}); + rt(old, new); +} + +#[test] +fn deeply_nested_mixed() { + let old = json!({ + "meta": {"ts": 100, "tags": ["a", "b"]}, + "items": [{"id": 1}, {"id": 2}], + }); + let new = json!({ + "meta": {"ts": 200, "tags": ["a", "b", "c"]}, + "items": [{"id": 1}, {"id": 3}, {"id": 4}], + "extra": true, + }); + rt(old, new); +} + +#[test] +fn pointer_escapes_slash_and_tilde() { + let old = json!({"a/b": 1, "c~d": 2}); + let new = json!({"a/b": 9, "c~d": 2}); + let patch = diff(&old, &new); + assert_eq!(patch.len(), 1); + let expected_path = "/a~1b"; + match &patch.0[0] { + Op::Replace { path, .. } => assert_eq!(path, expected_path), + other => panic!("expected Replace, got {other:?}"), + } + rt(old, new); + + let old2 = json!({"c~d": 1}); + let new2 = json!({"c~d": 2}); + let p2 = diff(&old2, &new2); + match &p2.0[0] { + Op::Replace { path, .. } => assert_eq!(path, "/c~0d"), + other => panic!("expected Replace, got {other:?}"), + } + rt(old2, new2); +} + +#[test] +fn apply_missing_path_errors() { + let doc = json!({"a": 1}); + let patch = Patch(vec![Op::Replace { + path: "/nope".into(), + value: json!(9), + }]); + let err = apply(&doc, &patch).unwrap_err(); + assert!(matches!(err, ApplyError::MissingTarget(_))); +} + +#[test] +fn apply_remove_missing_errors() { + let doc = json!({"a": 1}); + let patch = Patch(vec![Op::Remove { path: "/ghost".into() }]); + assert!(matches!( + apply(&doc, &patch).unwrap_err(), + ApplyError::MissingTarget(_) + )); +} + +#[test] +fn apply_replace_root() { + let doc = json!({"a": 1}); + let patch = Patch(vec![Op::Replace { + path: "".into(), + value: json!([1, 2, 3]), + }]); + let out = apply(&doc, &patch).unwrap(); + assert_eq!(out, json!([1, 2, 3])); +} + +#[test] +fn empty_patch_is_identity() { + let doc = json!({"a": [1, 2, {"b": true}]}); + let out = apply(&doc, &Patch::new()).unwrap(); + assert_eq!(out, doc); +} + +#[test] +fn apply_add_on_root_errors() { + let doc = json!({"a": 1}); + let patch = Patch(vec![Op::Add { + path: "".into(), + value: json!(2), + }]); + assert!(matches!( + apply(&doc, &patch).unwrap_err(), + ApplyError::CannotAddToRoot + )); +} + +#[test] +fn apply_remove_root_errors() { + let doc = json!([1, 2]); + let patch = Patch(vec![Op::Remove { path: "".into() }]); + assert!(matches!( + apply(&doc, &patch).unwrap_err(), + ApplyError::CannotRemoveRoot + )); +} + +#[test] +fn wire_format_matches_rfc_6902() { + let patch = Patch(vec![ + Op::Add { + path: "/x".into(), + value: json!(1), + }, + Op::Remove { path: "/y".into() }, + Op::Replace { + path: "/z".into(), + value: json!("hi"), + }, + ]); + let txt = serde_json::to_string(&patch).unwrap(); + let parsed: Value = serde_json::from_str(&txt).unwrap(); + assert_eq!( + parsed, + json!([ + {"op": "add", "path": "/x", "value": 1}, + {"op": "remove", "path": "/y"}, + {"op": "replace", "path": "/z", "value": "hi"}, + ]) + ); +} + +#[test] +fn patch_roundtrip_through_serde() { + let p1 = Patch(vec![ + Op::Add { + path: "/a".into(), + value: json!({"nested": [1, 2]}), + }, + Op::Remove { path: "/b/0".into() }, + ]); + let txt = serde_json::to_string(&p1).unwrap(); + let p2: Patch = serde_json::from_str(&txt).unwrap(); + assert_eq!(p1, p2); +} + +#[test] +fn array_of_objects_element_replace() { + let old = json!([{"id": 1, "v": "a"}, {"id": 2, "v": "b"}]); + let new = json!([{"id": 1, "v": "a"}, {"id": 2, "v": "z"}]); + let patch = diff(&old, &new); + // Should be a single deep replace at /1/v + assert_eq!(patch.len(), 1); + assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/1/v")); + rt(old, new); +} + +#[test] +fn null_to_value_is_replace() { + rt(json!(null), json!({"x": 1})); + rt(json!({"x": null}), json!({"x": 1})); + rt(json!({"x": 1}), json!({"x": null})); +} diff --git a/_primitives/_rust/kei-scheduler/Cargo.toml b/_primitives/_rust/kei-scheduler/Cargo.toml new file mode 100644 index 0000000..a04dbbb --- /dev/null +++ b/_primitives/_rust/kei-scheduler/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "kei-scheduler" +version = "0.1.0" +edition = "2021" +rust-version = "1.75" +description = "Durable task scheduler (cron / at / interval) — metadata primitive." + +[package.metadata.keisei] +backend = "sqlite" +db_env = "KEI_SCHEDULER_DB" +db_default = "~/.claude/scheduler/scheduler.sqlite" +schema_version = 1 + +[[bin]] +name = "kei-scheduler" +path = "src/main.rs" + +[lib] +name = "kei_scheduler" +path = "src/lib.rs" + +[dependencies] +kei-entity-store = { path = "../kei-entity-store" } +rusqlite = { version = "0.31", features = ["bundled"] } +clap = { version = "4", features = ["derive"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +anyhow = "1" +thiserror = "1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } +cron = "0.15" + +[dev-dependencies] +tempfile = "3" diff --git a/_primitives/_rust/kei-scheduler/src/error.rs b/_primitives/_rust/kei-scheduler/src/error.rs new file mode 100644 index 0000000..582076a --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/error.rs @@ -0,0 +1,51 @@ +//! Typed errors for kei-scheduler. `Error` is the public wrapper; +//! `ParseError` surfaces trigger-spec parse failures separately so +//! callers (and tests) can discriminate without string-matching. + +use thiserror::Error; + +/// Trigger-spec parse failures. Pure function — no DB contact. +#[derive(Debug, Error, PartialEq, Eq)] +pub enum ParseError { + #[error("unknown trigger_kind '{0}' — expected cron / at / interval")] + UnknownKind(String), + #[error("invalid cron expression '{0}': {1}")] + InvalidCron(String, String), + #[error("invalid ISO-8601 datetime '{0}' — expected YYYY-MM-DDTHH:MM:SSZ")] + InvalidIsoDatetime(String), + #[error("invalid interval '{0}' — expected positive integer seconds")] + InvalidInterval(String), +} + +/// Public scheduler error. Wraps rusqlite + anyhow + ParseError. +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Parse(#[from] ParseError), + #[error("sqlite: {0}")] + Sqlite(#[from] rusqlite::Error), + #[error("task not found: id={0}")] + NotFound(i64), + #[error("task name already exists: '{0}'")] + NameExists(String), + #[error("io: {0}")] + Io(#[from] std::io::Error), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl Error { + /// Inspect a rusqlite error and reclassify `UNIQUE constraint + /// failed: scheduler_tasks.name` into a typed `NameExists`. Other + /// SQLite errors pass through unchanged. + pub fn from_insert(err: rusqlite::Error, name: &str) -> Self { + let msg = err.to_string(); + if msg.contains("UNIQUE constraint failed") + && msg.contains("scheduler_tasks.name") + { + Self::NameExists(name.to_string()) + } else { + Self::Sqlite(err) + } + } +} diff --git a/_primitives/_rust/kei-scheduler/src/lib.rs b/_primitives/_rust/kei-scheduler/src/lib.rs new file mode 100644 index 0000000..2e91ef8 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/lib.rs @@ -0,0 +1,51 @@ +//! kei-scheduler — durable task scheduler primitive (cron / at / +//! interval triggers). Metadata store only; execution is the caller's +//! responsibility. `kei-pipe` or a cron-wrapper agent pumps +//! `list_due` → invoke → `mark_run` on an external cadence. +//! +//! Shape mirrors the sibling kei-task / kei-chat-store pattern: the +//! `kei-entity-store` engine owns DDL + migrations, and this crate adds +//! the scheduler-specific SQL helpers (`schedule`, `cancel`, `list_due`, +//! `mark_run`) on top of its `Store` shim. +//! +//! Public API surface (all I/O is synchronous rusqlite; no runtime): +//! - [`open`] / [`open_memory`] — build a `Store` with the scheduler schema. +//! - [`schedule`] — insert a new task + pre-compute `next_run_at`. +//! - [`cancel`] — set status=cancelled, clear `next_run_at`. +//! - [`list_due`] — rows where `next_run_at <= now` AND status is +//! pending/scheduled. +//! - [`mark_run`] — stamp last_run / last_exit_code / advance schedule. +//! - [`compute_next`] — pure function, no DB. + +pub mod error; +pub mod query; +pub mod run; +pub mod schedule; +pub mod schema; +pub mod store; +pub mod task; +pub mod trigger; + +pub use error::{Error, ParseError}; +pub use query::{get_by_name, get_task, list_due}; +pub use run::mark_run; +pub use schedule::{cancel, schedule}; +pub use schema::{ALL_SCHEMAS, SCHEDULER_SCHEMA}; +pub use store::Store; +pub use task::{status as task_status, Task}; +pub use trigger::{compute_next, validate_kind, AT, CRON, INTERVAL}; + +use std::path::Path; + +/// Convenience constructor — opens the scheduler DB at `path`, creating +/// parent dirs + running migrations. Wraps `Store::open` so callers who +/// only need the raw Store don't import the submodule. +pub fn open(path: &Path) -> anyhow::Result { + Store::open(path) +} + +/// In-memory scheduler — used by unit tests and by callers who want a +/// throwaway queue (e.g. a dry-run planner). +pub fn open_memory() -> anyhow::Result { + Store::open_memory() +} diff --git a/_primitives/_rust/kei-scheduler/src/main.rs b/_primitives/_rust/kei-scheduler/src/main.rs new file mode 100644 index 0000000..c0de392 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/main.rs @@ -0,0 +1,141 @@ +//! kei-scheduler CLI — schedule / cancel / list-due / mark-run / tick. +//! +//! Exit-code contract: +//! - 0 — success +//! - 1 — IO / storage / usage +//! - 2 — validation (bad trigger kind / spec / unknown id) + +use chrono::Utc; +use clap::{Parser, Subcommand}; +use kei_scheduler::{ + cancel, get_task, list_due, mark_run, schedule, Error, Store, +}; +use std::path::PathBuf; +use std::process::ExitCode; + +struct CliError { + code: u8, + msg: String, +} + +impl CliError { + fn io(msg: impl Into) -> Self { Self { code: 1, msg: msg.into() } } + fn validation(msg: impl Into) -> Self { Self { code: 2, msg: msg.into() } } +} + +impl From for CliError { + fn from(e: anyhow::Error) -> Self { Self::io(format!("{e:#}")) } +} + +impl From for CliError { + fn from(e: Error) -> Self { + match &e { + Error::Parse(_) | Error::NotFound(_) | Error::NameExists(_) => + Self::validation(format!("{e}")), + _ => Self::io(format!("{e}")), + } + } +} + +#[derive(Parser)] +#[command(name = "kei-scheduler", version, about = "Durable task scheduler (cron/at/interval)")] +struct Cli { + #[arg(long)] + db: Option, + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Subcommand)] +enum Cmd { + /// Insert a new scheduled task. + Schedule { + #[arg(long)] name: String, + #[arg(long)] kind: String, + #[arg(long)] spec: String, + #[arg(long)] cmd: String, + }, + /// Cancel a task by id. + Cancel { #[arg(long)] id: i64 }, + /// Print due tasks as a JSON array (reads `now = Utc::now`). + ListDue, + /// Record a run's exit code and advance next_run_at. + MarkRun { + #[arg(long)] id: i64, + #[arg(long)] exit: i64, + }, + /// Convenience: `list-due` for the current wall clock. + Tick, + /// Print one task as JSON. + Get { #[arg(long)] id: i64 }, +} + +fn db_path(cli_db: Option) -> PathBuf { + if let Some(p) = cli_db { return p; } + if let Ok(e) = std::env::var("KEI_SCHEDULER_DB") { return PathBuf::from(e); } + let home = std::env::var("HOME").unwrap_or_else(|_| ".".into()); + PathBuf::from(home).join(".claude/scheduler/scheduler.sqlite") +} + +fn run() -> Result<(), CliError> { + let cli = Cli::parse(); + let store = Store::open(&db_path(cli.db))?; + dispatch(&store, cli.cmd) +} + +fn dispatch(store: &Store, cmd: Cmd) -> Result<(), CliError> { + match cmd { + Cmd::Schedule { name, kind, spec, cmd } => + cmd_schedule(store, &name, &kind, &spec, &cmd), + Cmd::Cancel { id } => { cancel(store.conn(), id)?; println!("cancelled {id}"); Ok(()) } + Cmd::ListDue | Cmd::Tick => cmd_list_due(store), + Cmd::MarkRun { id, exit } => { + mark_run(store.conn(), id, exit, Utc::now().timestamp())?; + println!("marked run {id} exit={exit}"); + Ok(()) + } + Cmd::Get { id } => cmd_get(store, id), + } +} + +fn cmd_schedule( + store: &Store, + name: &str, + kind: &str, + spec: &str, + cmd: &str, +) -> Result<(), CliError> { + let id = schedule(store.conn(), name, kind, spec, cmd)?; + println!("{id}"); + Ok(()) +} + +fn cmd_list_due(store: &Store) -> Result<(), CliError> { + let now = Utc::now().timestamp(); + let rows = list_due(store.conn(), now)?; + let json = serde_json::to_string_pretty(&rows).map_err(|e| CliError::io(e.to_string()))?; + println!("{json}"); + Ok(()) +} + +fn cmd_get(store: &Store, id: i64) -> Result<(), CliError> { + match get_task(store.conn(), id)? { + Some(t) => { + let json = serde_json::to_string_pretty(&t) + .map_err(|e| CliError::io(e.to_string()))?; + println!("{json}"); + Ok(()) + } + None => Err(CliError::validation(format!("task not found: id={id}"))), + } +} + +fn main() -> ExitCode { + match run() { + Ok(()) => ExitCode::SUCCESS, + Err(CliError { code, msg }) => { + eprintln!("{msg}"); + ExitCode::from(code) + } + } +} diff --git a/_primitives/_rust/kei-scheduler/src/query.rs b/_primitives/_rust/kei-scheduler/src/query.rs new file mode 100644 index 0000000..e133b2f --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/query.rs @@ -0,0 +1,62 @@ +//! Read-side queries: `list_due` and `get_task`. +//! +//! `list_due` is the hot path driven by external tickers (`kei-pipe`, +//! cron-wrapper agents). It selects rows whose `next_run_at <= now` AND +//! `status IN (pending, scheduled)` AND `next_run_at IS NOT NULL`. The +//! NOT-NULL filter excludes cancelled + one-shot-completed rows. + +use crate::error::Error; +use crate::task::{status, Task, SELECT_COLS}; +use rusqlite::{params, Connection}; + +/// Fetch all rows whose `next_run_at <= now` and status makes them +/// eligible to run. Ordered by `next_run_at ASC` so the earliest-due +/// task surfaces first. +pub fn list_due(conn: &Connection, now: i64) -> Result, Error> { + let sql = format!( + "SELECT {cols} FROM scheduler_tasks \ + WHERE next_run_at IS NOT NULL \ + AND next_run_at <= ?1 \ + AND status IN (?2, ?3) \ + ORDER BY next_run_at ASC, id ASC", + cols = SELECT_COLS, + ); + let mut stmt = conn.prepare(&sql)?; + let rows = stmt + .query_map(params![now, status::PENDING, status::SCHEDULED], Task::from_row)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) +} + +/// Fetch a single task by id. `Ok(None)` if no such row. +pub fn get_task(conn: &Connection, id: i64) -> Result, Error> { + let sql = format!( + "SELECT {cols} FROM scheduler_tasks WHERE id = ?1", + cols = SELECT_COLS, + ); + let mut stmt = conn.prepare(&sql)?; + let mut rows = stmt.query_map(params![id], Task::from_row)?; + match rows.next() { + Some(r) => Ok(Some(r?)), + None => Ok(None), + } +} + +/// Fetch a single task by unique name. Used by the CLI's `cancel --name` +/// convenience; kept thin so the query-layer responsibilities stay in +/// one module. +pub fn get_by_name(conn: &Connection, name: &str) -> Result, Error> { + let sql = format!( + "SELECT {cols} FROM scheduler_tasks WHERE name = ?1", + cols = SELECT_COLS, + ); + let mut stmt = conn.prepare(&sql)?; + let mut rows = stmt.query_map(params![name], Task::from_row)?; + match rows.next() { + Some(r) => Ok(Some(r?)), + None => Ok(None), + } +} diff --git a/_primitives/_rust/kei-scheduler/src/run.rs b/_primitives/_rust/kei-scheduler/src/run.rs new file mode 100644 index 0000000..d00a292 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/run.rs @@ -0,0 +1,86 @@ +//! `mark_run` — record completion of a triggered execution. +//! +//! Caller supplies `now` explicitly so tests are deterministic. The new +//! `next_run_at` is re-computed from `now` using the task's stored +//! trigger_kind / trigger_spec: +//! - `interval` → `now + secs` (never terminal). +//! - `cron` → next cron occurrence after `now` (falls back to +//! terminal `done` if no future occurrence exists). +//! - `at` → one-shot; status → `done`, next_run_at → NULL. +//! +//! Status transitions: cancelled rows are immutable (function returns +//! `Error::NotFound` to keep the surface minimal — the caller should +//! not be marking runs on cancelled tasks). + +use crate::error::Error; +use crate::query::get_task; +use crate::task::status; +use crate::trigger::{compute_next, AT}; +use rusqlite::{params, Connection}; + +/// Record a run outcome and advance the schedule. +/// +/// Returns `Ok(())` on success, `Error::NotFound` if `id` doesn't +/// exist or refers to a cancelled task, `Error::Parse` if the stored +/// trigger spec is no longer parseable (should not happen if the row +/// was created via `schedule`). +pub fn mark_run( + conn: &Connection, + id: i64, + exit_code: i64, + now: i64, +) -> Result<(), Error> { + let task = match get_task(conn, id)? { + Some(t) if t.status == status::CANCELLED => return Err(Error::NotFound(id)), + Some(t) => t, + None => return Err(Error::NotFound(id)), + }; + let (next, next_status) = advance(&task.trigger_kind, &task.trigger_spec, exit_code, now)?; + write_run(conn, id, exit_code, now, next, next_status) +} + +/// Compute the next `(next_run_at, status)` pair given the trigger + +/// run outcome. Exit-code 0 on `at` → `done`; non-zero → `failed`. +/// `cron`/`interval` ignore exit code when scheduling next fire. +fn advance( + kind: &str, + spec: &str, + exit_code: i64, + now: i64, +) -> Result<(Option, &'static str), Error> { + if kind == AT { + let s = if exit_code == 0 { status::DONE } else { status::FAILED }; + return Ok((None, s)); + } + let next = compute_next(kind, spec, now)?; + let status_next = match next { + Some(_) => status::SCHEDULED, + // Cron schedule with no future occurrence — treat as terminal. + None => status::DONE, + }; + Ok((next, status_next)) +} + +fn write_run( + conn: &Connection, + id: i64, + exit_code: i64, + now: i64, + next_run_at: Option, + next_status: &str, +) -> Result<(), Error> { + let rows = conn.execute( + "UPDATE scheduler_tasks SET \ + last_run_at = ?1, \ + last_exit_code = ?2, \ + next_run_at = ?3, \ + status = ?4, \ + updated_at = ?5 \ + WHERE id = ?6", + params![now, exit_code, next_run_at, next_status, now, id], + )?; + if rows == 0 { + return Err(Error::NotFound(id)); + } + Ok(()) +} diff --git a/_primitives/_rust/kei-scheduler/src/schedule.rs b/_primitives/_rust/kei-scheduler/src/schedule.rs new file mode 100644 index 0000000..49ebf18 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/schedule.rs @@ -0,0 +1,70 @@ +//! `schedule` + `cancel` operations. INSERT / UPDATE on the +//! `scheduler_tasks` table with trigger validation and initial +//! `next_run_at` computed from `compute_next`. + +use crate::error::Error; +use crate::task::status; +use crate::trigger::{compute_next, validate_kind}; +use chrono::Utc; +use rusqlite::{params, Connection}; + +/// Insert a new task row. Validates the trigger spec and pre-computes +/// `next_run_at` from `now = Utc::now().timestamp()`. +/// +/// Returns the new row's `id`. Errors: +/// - `Error::Parse(...)` — invalid kind / spec +/// - `Error::NameExists(name)` — name UNIQUE violation +/// - `Error::Sqlite(...)` — other DB failures +pub fn schedule( + conn: &Connection, + name: &str, + trigger_kind: &str, + trigger_spec: &str, + command: &str, +) -> Result { + let kind = validate_kind(trigger_kind)?; + let now = Utc::now().timestamp(); + let next = compute_next(kind, trigger_spec, now)?; + insert_row(conn, name, kind, trigger_spec, command, next, now) +} + +fn insert_row( + conn: &Connection, + name: &str, + kind: &str, + spec: &str, + command: &str, + next_run_at: Option, + now: i64, +) -> Result { + let sql = "INSERT INTO scheduler_tasks \ + (name, trigger_kind, trigger_spec, command, status, \ + last_run_at, next_run_at, last_exit_code, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6, NULL, ?7, ?7)"; + let status = status::PENDING; + let result = conn.execute( + sql, + params![name, kind, spec, command, status, next_run_at, now], + ); + match result { + Ok(_) => Ok(conn.last_insert_rowid()), + Err(e) => Err(Error::from_insert(e, name)), + } +} + +/// Mark a task cancelled. Clears `next_run_at` so it cannot match +/// `list_due` again even if somebody re-activates the row manually. +/// Idempotent: cancelling an already-cancelled task is a no-op. +/// Missing id → `Error::NotFound`. +pub fn cancel(conn: &Connection, id: i64) -> Result<(), Error> { + let now = Utc::now().timestamp(); + let rows = conn.execute( + "UPDATE scheduler_tasks SET status = ?1, next_run_at = NULL, updated_at = ?2 \ + WHERE id = ?3", + params![status::CANCELLED, now, id], + )?; + if rows == 0 { + return Err(Error::NotFound(id)); + } + Ok(()) +} diff --git a/_primitives/_rust/kei-scheduler/src/schema.rs b/_primitives/_rust/kei-scheduler/src/schema.rs new file mode 100644 index 0000000..a9ce471 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/schema.rs @@ -0,0 +1,52 @@ +//! kei-scheduler EntitySchema — declarative spec consumed by +//! `kei_entity_store::Store`. +//! +//! Schema matches the sibling pattern (kei-task, kei-chat-store): one +//! primary table with standard CRUD fields plus scheduler-specific +//! trigger + run-tracking columns. The `name` UNIQUE constraint rides +//! the engine's `custom_migrations` slot because `FieldDef` doesn't +//! expose a UNIQUE flag — a unique index on the column provides the +//! same semantics. + +use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef}; + +static FIELDS: &[FieldDef] = &[ + FieldDef::pk("id"), + FieldDef::text_nn("name"), + FieldDef::text_nn("trigger_kind"), + FieldDef::text_nn("trigger_spec"), + FieldDef::text_nn("command"), + FieldDef::text_default("status", "pending"), + FieldDef::integer("last_run_at"), + FieldDef::integer("next_run_at"), + FieldDef::integer("last_exit_code"), + FieldDef::created_at(), + FieldDef::updated_at(), +]; + +const DDL_SECONDARY: &str = r#" + CREATE UNIQUE INDEX IF NOT EXISTS idx_scheduler_tasks_name + ON scheduler_tasks(name); + CREATE INDEX IF NOT EXISTS idx_scheduler_tasks_status + ON scheduler_tasks(status); + CREATE INDEX IF NOT EXISTS idx_scheduler_tasks_next_run + ON scheduler_tasks(next_run_at); +"#; + +pub static SCHEDULER_SCHEMA: EntitySchema = EntitySchema { + name: "scheduler_task", + table: "scheduler_tasks", + fields: FIELDS, + // Generic verbs are disabled — scheduler-specific ops live in + // `schedule.rs` / `query.rs` / `run.rs`. The engine still owns DDL. + enabled_verbs: &[], + fts_columns: None, + edge_table: None, + edge_key_kind: EdgeKeyKind::IntegerPair, + archived_field: None, + custom_migrations: &[DDL_SECONDARY], +}; + +/// Full schema list passed to the engine when opening a Store. Declared +/// here so the shim in `store.rs` stays a one-liner. +pub static ALL_SCHEMAS: &[&EntitySchema] = &[&SCHEDULER_SCHEMA]; diff --git a/_primitives/_rust/kei-scheduler/src/store.rs b/_primitives/_rust/kei-scheduler/src/store.rs new file mode 100644 index 0000000..e325455 --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/store.rs @@ -0,0 +1,32 @@ +//! Scheduler store — thin shim over `kei_entity_store::Store`. +//! +//! Mirrors the kei-chat-store pattern: the engine owns DDL + migration +//! transactions, and this crate adds scheduler-specific SQL helpers +//! (`schedule`, `cancel`, `list_due`, `mark_run`) that live in sibling +//! modules. + +use crate::schema::ALL_SCHEMAS; +use anyhow::Result; +use kei_entity_store::Store as EntityStore; +use rusqlite::Connection; +use std::path::Path; + +pub struct Store { + inner: EntityStore, +} + +impl Store { + pub fn open(path: &Path) -> Result { + let inner = EntityStore::open(path, ALL_SCHEMAS)?; + Ok(Self { inner }) + } + + pub fn open_memory() -> Result { + let inner = EntityStore::open_memory(ALL_SCHEMAS)?; + Ok(Self { inner }) + } + + pub fn conn(&self) -> &Connection { + self.inner.conn() + } +} diff --git a/_primitives/_rust/kei-scheduler/src/task.rs b/_primitives/_rust/kei-scheduler/src/task.rs new file mode 100644 index 0000000..1dd88ae --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/task.rs @@ -0,0 +1,61 @@ +//! `Task` — in-memory snapshot of a `scheduler_tasks` row. +//! +//! Serializable for the CLI (`list-due` prints JSON). Status is a plain +//! String so callers can introduce new sentinels without a type bump. + +use rusqlite::Row; +use serde::{Deserialize, Serialize}; + +/// Canonical task status sentinels. Schema default is `pending`; +/// lifecycle: `pending` → `scheduled` (optional staging) → `running` → +/// `done` / `failed`. `cancelled` is terminal and set by `cancel()`. +pub mod status { + pub const PENDING: &str = "pending"; + pub const SCHEDULED: &str = "scheduled"; + pub const RUNNING: &str = "running"; + pub const DONE: &str = "done"; + pub const FAILED: &str = "failed"; + pub const CANCELLED: &str = "cancelled"; +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Task { + pub id: i64, + pub name: String, + pub trigger_kind: String, + pub trigger_spec: String, + pub command: String, + pub status: String, + pub last_run_at: Option, + pub next_run_at: Option, + pub last_exit_code: Option, + pub created_at: i64, + pub updated_at: i64, +} + +impl Task { + /// Column order MUST match the SELECT in `query.rs::SELECT_ALL`. + /// rusqlite returns `NULL` for INTEGER columns as 0 unless we read + /// into `Option` explicitly, which is what we want for the + /// nullable timestamps + exit code. + pub fn from_row(r: &Row) -> rusqlite::Result { + Ok(Self { + id: r.get(0)?, + name: r.get(1)?, + trigger_kind: r.get(2)?, + trigger_spec: r.get(3)?, + command: r.get(4)?, + status: r.get(5)?, + last_run_at: r.get(6)?, + next_run_at: r.get(7)?, + last_exit_code: r.get(8)?, + created_at: r.get(9)?, + updated_at: r.get(10)?, + }) + } +} + +/// SELECT column list used by `query.rs` and `run.rs`. Exported so +/// callers building custom queries stay in sync with `Task::from_row`. +pub const SELECT_COLS: &str = "id, name, trigger_kind, trigger_spec, command, status, \ + last_run_at, next_run_at, last_exit_code, created_at, updated_at"; diff --git a/_primitives/_rust/kei-scheduler/src/trigger.rs b/_primitives/_rust/kei-scheduler/src/trigger.rs new file mode 100644 index 0000000..3ba253e --- /dev/null +++ b/_primitives/_rust/kei-scheduler/src/trigger.rs @@ -0,0 +1,105 @@ +//! Trigger parsing + `compute_next` core. +//! +//! Three trigger kinds: +//! - `cron` — 5-field or 6-field cron expression. 5-field inputs get a +//! leading `"0 "` for seconds (standard cron semantics: trigger at +//! the start of the minute). +//! - `at` — one-shot ISO-8601 datetime with trailing `Z` (UTC). +//! - `interval` — repeat every N seconds (positive integer). +//! +//! `compute_next(kind, spec, from)` is pure — no DB, no clock read. +//! Callers pass `from` explicitly so tests are deterministic. + +use crate::error::ParseError; +use chrono::{DateTime, TimeZone, Utc}; +use cron::Schedule; +use std::str::FromStr; + +/// Canonical trigger-kind names. Schema stores these as TEXT; this +/// helper keeps the spelling in one place and rejects anything else at +/// the boundary. +pub const CRON: &str = "cron"; +pub const AT: &str = "at"; +pub const INTERVAL: &str = "interval"; + +/// Validate a trigger kind string. Returned as `&'static str` so callers +/// can store the canonical form without allocating. +pub fn validate_kind(kind: &str) -> Result<&'static str, ParseError> { + match kind { + CRON => Ok(CRON), + AT => Ok(AT), + INTERVAL => Ok(INTERVAL), + other => Err(ParseError::UnknownKind(other.to_string())), + } +} + +/// Compute the next fire time (unix seconds, UTC) for a trigger given a +/// reference `from` timestamp. Returns `Ok(None)` when no future fire +/// exists (e.g. `at` timestamp already in the past, or cron schedule +/// with no upcoming occurrence in `chrono`'s representable range). +pub fn compute_next( + kind: &str, + spec: &str, + from: i64, +) -> Result, ParseError> { + match validate_kind(kind)? { + CRON => next_cron(spec, from), + AT => next_at(spec, from), + INTERVAL => next_interval(spec, from), + _ => unreachable!("validate_kind rejects other variants"), + } +} + +fn next_cron(spec: &str, from: i64) -> Result, ParseError> { + let canon = canonicalize_cron(spec); + let sched = Schedule::from_str(&canon) + .map_err(|e| ParseError::InvalidCron(spec.to_string(), e.to_string()))?; + let from_dt = ts_to_utc(from) + .ok_or_else(|| ParseError::InvalidCron(spec.to_string(), "ref-ts overflow".into()))?; + Ok(sched.after(&from_dt).next().map(|dt| dt.timestamp())) +} + +/// Accept both classic 5-field (`min hour dom mon dow`) and the +/// cron-crate's 6-field form (`sec min hour dom mon dow`). 7-field +/// expressions (with year) pass through untouched. +fn canonicalize_cron(spec: &str) -> String { + let fields = spec.split_whitespace().count(); + if fields == 5 { + format!("0 {}", spec) + } else { + spec.to_string() + } +} + +fn next_at(spec: &str, from: i64) -> Result, ParseError> { + let ts = parse_iso_z(spec) + .ok_or_else(|| ParseError::InvalidIsoDatetime(spec.to_string()))?; + if ts > from { + Ok(Some(ts)) + } else { + Ok(None) + } +} + +fn next_interval(spec: &str, from: i64) -> Result, ParseError> { + let secs: i64 = spec + .parse::() + .map_err(|_| ParseError::InvalidInterval(spec.to_string()))? + .try_into() + .map_err(|_| ParseError::InvalidInterval(spec.to_string()))?; + if secs == 0 { + return Err(ParseError::InvalidInterval(spec.to_string())); + } + Ok(Some(from.saturating_add(secs))) +} + +/// Parse `YYYY-MM-DDTHH:MM:SSZ` (no fractional seconds, no offset other +/// than literal `Z`). `DateTime::parse_from_rfc3339` accepts that form. +fn parse_iso_z(spec: &str) -> Option { + let dt = DateTime::parse_from_rfc3339(spec).ok()?; + Some(dt.with_timezone(&Utc).timestamp()) +} + +fn ts_to_utc(ts: i64) -> Option> { + Utc.timestamp_opt(ts, 0).single() +} diff --git a/_primitives/_rust/kei-scheduler/tests/scheduler_integration.rs b/_primitives/_rust/kei-scheduler/tests/scheduler_integration.rs new file mode 100644 index 0000000..b5409ec --- /dev/null +++ b/_primitives/_rust/kei-scheduler/tests/scheduler_integration.rs @@ -0,0 +1,189 @@ +//! Integration tests for kei-scheduler. Uses `Store::open_memory` so +//! each test owns a throwaway DB and a deterministic wall clock +//! (`now` passed explicitly where the API allows). +//! +//! `schedule()` + `cancel()` internally read `Utc::now()` once; that's +//! fine because we check relative ordering (`next_run_at` compared to +//! a post-call `Utc::now()` lower bound), not absolute values. + +use chrono::Utc; +use kei_scheduler::{ + cancel, compute_next, get_task, list_due, mark_run, open_memory, schedule, + task_status, ParseError, Store, AT, CRON, INTERVAL, +}; + +fn store() -> Store { + open_memory().expect("in-memory store opens clean") +} + +#[test] +fn cron_schedule_sets_future_next_run() { + let s = store(); + let before = Utc::now().timestamp(); + let id = schedule(s.conn(), "cron1", CRON, "*/5 * * * *", "echo").unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.name, "cron1"); + assert_eq!(t.trigger_kind, "cron"); + assert_eq!(t.status, task_status::PENDING); + let next = t.next_run_at.expect("cron trigger must produce a next_run_at"); + assert!(next >= before, "next_run_at {next} must be >= launch time {before}"); +} + +#[test] +fn at_schedule_with_future_ts_matches_iso_parse() { + let s = store(); + // 2030-01-01T00:00:00Z → unix 1893456000 (verified via + // chrono::DateTime::parse_from_rfc3339 at test time). + let id = schedule(s.conn(), "at1", AT, "2030-01-01T00:00:00Z", "echo").unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.next_run_at, Some(1_893_456_000)); + assert_eq!(t.trigger_kind, "at"); +} + +#[test] +fn interval_schedule_sets_now_plus_secs() { + let s = store(); + let before = Utc::now().timestamp(); + let id = schedule(s.conn(), "int1", INTERVAL, "3600", "echo").unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + let next = t.next_run_at.expect("interval trigger must set next_run_at"); + let after = Utc::now().timestamp(); + assert!(next >= before + 3600); + assert!(next <= after + 3600); +} + +#[test] +fn cancel_sets_status_and_clears_next_run() { + let s = store(); + let id = schedule(s.conn(), "tcan", INTERVAL, "60", "echo").unwrap(); + cancel(s.conn(), id).unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.status, task_status::CANCELLED); + assert_eq!(t.next_run_at, None); +} + +#[test] +fn list_due_returns_eligible_pending_tasks() { + let s = store(); + // An interval with spec=60 produces next_run ≈ now+60. Query with + // now+120 to make sure it's due. + schedule(s.conn(), "due1", INTERVAL, "60", "echo").unwrap(); + let query_ts = Utc::now().timestamp() + 120; + let due = list_due(s.conn(), query_ts).unwrap(); + assert_eq!(due.len(), 1); + assert_eq!(due[0].name, "due1"); + assert_eq!(due[0].status, task_status::PENDING); +} + +#[test] +fn list_due_excludes_cancelled_tasks() { + let s = store(); + let id = schedule(s.conn(), "cx", INTERVAL, "60", "echo").unwrap(); + cancel(s.conn(), id).unwrap(); + let due = list_due(s.conn(), Utc::now().timestamp() + 100_000).unwrap(); + assert!(due.is_empty(), "cancelled tasks must not appear in list_due"); +} + +#[test] +fn mark_run_on_interval_advances_next_run() { + let s = store(); + let id = schedule(s.conn(), "mrint", INTERVAL, "3600", "echo").unwrap(); + let now = 2_000_000_000; + mark_run(s.conn(), id, 0, now).unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.last_run_at, Some(now)); + assert_eq!(t.last_exit_code, Some(0)); + assert_eq!(t.next_run_at, Some(now + 3600)); + assert_eq!(t.status, task_status::SCHEDULED); +} + +#[test] +fn mark_run_on_at_completes_task() { + let s = store(); + let id = schedule(s.conn(), "mrat", AT, "2030-01-01T00:00:00Z", "echo").unwrap(); + mark_run(s.conn(), id, 0, 1_893_456_005).unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.status, task_status::DONE); + assert_eq!(t.next_run_at, None); + assert_eq!(t.last_exit_code, Some(0)); +} + +#[test] +fn mark_run_on_cron_recomputes_next() { + let s = store(); + let id = schedule(s.conn(), "mrcron", CRON, "*/5 * * * *", "echo").unwrap(); + let now: i64 = 2_000_000_000; + mark_run(s.conn(), id, 0, now).unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + let next = t.next_run_at.expect("cron mark_run must recompute next_run_at"); + // `*/5 * * * *` = every 5 minutes at second-0; next must be within + // 300 seconds of `now` and strictly greater. + assert!(next > now); + assert!(next <= now + 300, "next {next} must be within 5m of now {now}"); + assert_eq!(t.status, task_status::SCHEDULED); +} + +#[test] +fn mark_run_with_nonzero_exit_at_sets_failed() { + let s = store(); + let id = schedule(s.conn(), "failat", AT, "2030-06-15T12:00:00Z", "echo").unwrap(); + mark_run(s.conn(), id, 17, 1_910_000_000).unwrap(); + let t = get_task(s.conn(), id).unwrap().unwrap(); + assert_eq!(t.status, task_status::FAILED); + assert_eq!(t.last_exit_code, Some(17)); + assert_eq!(t.next_run_at, None); +} + +#[test] +fn invalid_cron_returns_parse_error() { + let err = compute_next(CRON, "not a cron expression", 0).unwrap_err(); + assert!( + matches!(err, ParseError::InvalidCron(_, _)), + "expected InvalidCron, got {err:?}" + ); +} + +#[test] +fn invalid_iso_datetime_returns_parse_error() { + let err = compute_next(AT, "not-a-date", 0).unwrap_err(); + assert!( + matches!(err, ParseError::InvalidIsoDatetime(_)), + "expected InvalidIsoDatetime, got {err:?}" + ); +} + +#[test] +fn malformed_trigger_kind_is_rejected() { + let s = store(); + let err = schedule(s.conn(), "bad", "weekly", "whatever", "echo") + .expect_err("unknown kind must fail"); + assert!( + matches!(err, kei_scheduler::Error::Parse(ParseError::UnknownKind(_))), + "expected UnknownKind, got {err:?}" + ); +} + +#[test] +fn duplicate_name_is_rejected_typed() { + let s = store(); + schedule(s.conn(), "dup", INTERVAL, "60", "echo").unwrap(); + let err = schedule(s.conn(), "dup", INTERVAL, "120", "echo") + .expect_err("unique-name collision must fail"); + assert!( + matches!(err, kei_scheduler::Error::NameExists(ref n) if n == "dup"), + "expected NameExists(dup), got {err:?}" + ); +} + +#[test] +fn zero_interval_is_rejected() { + let err = compute_next(INTERVAL, "0", 100).unwrap_err(); + assert!(matches!(err, ParseError::InvalidInterval(_))); +} + +#[test] +fn at_in_the_past_returns_none() { + // 2020-01-01 with `from = 2030-era` → no future fire. + let next = compute_next(AT, "2020-01-01T00:00:00Z", 1_893_456_000).unwrap(); + assert_eq!(next, None); +} diff --git a/_primitives/_rust/kei-watch/Cargo.toml b/_primitives/_rust/kei-watch/Cargo.toml new file mode 100644 index 0000000..8d2c18a --- /dev/null +++ b/_primitives/_rust/kei-watch/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "kei-watch" +version = "0.1.0" +edition = "2021" +rust-version = "1.77" +description = "Filesystem watcher primitive — thin canonical wrapper around notify" + +[[bin]] +name = "kei-watch" +path = "src/main.rs" + +[lib] +name = "kei_watch" +path = "src/lib.rs" + +[dependencies] +notify = "8" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +clap = { version = "4", features = ["derive"] } + +[dev-dependencies] +tempfile = "3" diff --git a/_primitives/_rust/kei-watch/src/debounce.rs b/_primitives/_rust/kei-watch/src/debounce.rs new file mode 100644 index 0000000..557a8ee --- /dev/null +++ b/_primitives/_rust/kei-watch/src/debounce.rs @@ -0,0 +1,100 @@ +//! Coarse debounce: collapse duplicate `(path, kind)` events fired +//! within [`DEBOUNCE_WINDOW`] of the previous one. +//! +//! Intent: swallow FS-level bursts (editor-write double-fire, compiler +//! rewrite patterns). NOT a replacement for notify-debouncer-full — we +//! don't do event reordering or close/write correlation, just per-key +//! rate-limiting. + +use crate::event::{Event, EventKind}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +/// Collapse window for duplicate `(path, kind)` pairs. +pub const DEBOUNCE_WINDOW: Duration = Duration::from_millis(50); + +/// Per-key last-seen state. Small enough to live in a `HashMap` — pruned +/// opportunistically when entries exceed [`PRUNE_THRESHOLD`] (keeps +/// long-running watchers from growing unboundedly). +pub struct Debouncer { + last_seen: HashMap<(PathBuf, EventKind), Instant>, +} + +const PRUNE_THRESHOLD: usize = 1024; + +impl Debouncer { + pub fn new() -> Self { + Self { last_seen: HashMap::new() } + } + + /// Should this event pass through? + /// + /// Returns `true` on first occurrence of `(path, kind)` or if the + /// last occurrence was ≥ `DEBOUNCE_WINDOW` ago. Updates internal + /// state regardless of outcome. + pub fn accept(&mut self, ev: &Event) -> bool { + let key = (ev.path.clone(), ev.kind); + let now = Instant::now(); + let decision = match self.last_seen.get(&key) { + Some(&prev) if now.duration_since(prev) < DEBOUNCE_WINDOW => false, + _ => true, + }; + self.last_seen.insert(key, now); + if self.last_seen.len() > PRUNE_THRESHOLD { + self.prune(now); + } + decision + } + + /// Drop entries older than 10× the debounce window. Called + /// opportunistically when the map grows large. + fn prune(&mut self, now: Instant) { + let cutoff = DEBOUNCE_WINDOW * 10; + self.last_seen.retain(|_, &mut t| now.duration_since(t) < cutoff); + } +} + +impl Default for Debouncer { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + + fn ev(kind: EventKind, path: &str) -> Event { + Event::new(kind, PathBuf::from(path), None) + } + + #[test] + fn first_event_passes() { + let mut d = Debouncer::new(); + assert!(d.accept(&ev(EventKind::Modified, "/a"))); + } + + #[test] + fn rapid_duplicate_is_dropped() { + let mut d = Debouncer::new(); + assert!(d.accept(&ev(EventKind::Modified, "/a"))); + assert!(!d.accept(&ev(EventKind::Modified, "/a"))); + } + + #[test] + fn different_kind_is_not_debounced() { + let mut d = Debouncer::new(); + assert!(d.accept(&ev(EventKind::Modified, "/a"))); + assert!(d.accept(&ev(EventKind::Created, "/a"))); + } + + #[test] + fn after_window_event_passes_again() { + let mut d = Debouncer::new(); + assert!(d.accept(&ev(EventKind::Modified, "/a"))); + sleep(DEBOUNCE_WINDOW + Duration::from_millis(20)); + assert!(d.accept(&ev(EventKind::Modified, "/a"))); + } +} diff --git a/_primitives/_rust/kei-watch/src/error.rs b/_primitives/_rust/kei-watch/src/error.rs new file mode 100644 index 0000000..0f227c4 --- /dev/null +++ b/_primitives/_rust/kei-watch/src/error.rs @@ -0,0 +1,89 @@ +//! Error type for the kei-watch primitive. +//! +//! Wraps `notify` + `io` errors behind a stable surface so downstream +//! consumers don't bind to notify's error hierarchy. + +use std::fmt; +use std::path::PathBuf; + +/// Failure modes for [`crate::Watcher`] operations. +#[derive(Debug)] +pub enum WatchError { + /// Underlying OS I/O failure. + Io(std::io::Error), + /// notify backend failed to start or watch. + NotifyBackend(String), + /// Path given to `watch` does not exist on disk. + PathNotFound(PathBuf), + /// `unwatch` called on a path that was never registered. + WatchNotFound(PathBuf), +} + +impl fmt::Display for WatchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WatchError::Io(e) => write!(f, "io: {e}"), + WatchError::NotifyBackend(s) => write!(f, "notify backend: {s}"), + WatchError::PathNotFound(p) => write!(f, "path not found: {}", p.display()), + WatchError::WatchNotFound(p) => write!(f, "watch not found: {}", p.display()), + } + } +} + +impl std::error::Error for WatchError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + WatchError::Io(e) => Some(e), + _ => None, + } + } +} + +impl From for WatchError { + fn from(e: std::io::Error) -> Self { + WatchError::Io(e) + } +} + +/// Convert a `notify::Error` into `WatchError`, preserving the path hint +/// for path-related errors where possible. +pub fn from_notify(err: notify::Error) -> WatchError { + use notify::ErrorKind as NK; + let first_path = err.paths.first().cloned(); + match err.kind { + NK::PathNotFound => { + WatchError::PathNotFound(first_path.unwrap_or_default()) + } + NK::WatchNotFound => { + WatchError::WatchNotFound(first_path.unwrap_or_default()) + } + NK::Io(ioe) => WatchError::Io(ioe), + NK::Generic(s) => WatchError::NotifyBackend(s), + NK::InvalidConfig(_) => { + WatchError::NotifyBackend("invalid config".into()) + } + NK::MaxFilesWatch => { + WatchError::NotifyBackend("OS watch limit reached".into()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn display_has_prefix() { + let e = WatchError::PathNotFound(PathBuf::from("/nope")); + assert!(format!("{e}").starts_with("path not found")); + } + + #[test] + fn notify_path_not_found_maps() { + let ne = notify::Error::path_not_found().add_path(PathBuf::from("/x")); + match from_notify(ne) { + WatchError::PathNotFound(p) => assert_eq!(p, PathBuf::from("/x")), + other => panic!("expected PathNotFound, got {other:?}"), + } + } +} diff --git a/_primitives/_rust/kei-watch/src/event.rs b/_primitives/_rust/kei-watch/src/event.rs new file mode 100644 index 0000000..411fee8 --- /dev/null +++ b/_primitives/_rust/kei-watch/src/event.rs @@ -0,0 +1,85 @@ +//! Canonical event type emitted by [`crate::Watcher`]. +//! +//! Decoupled from `notify::Event` so downstream consumers don't bind to +//! notify's evolving hierarchy. Only four kinds are emitted: +//! Created / Modified / Deleted / Renamed. + +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +/// Coarse event classification. All notify sub-kinds fold into these four. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum EventKind { + Created, + Modified, + Deleted, + Renamed, +} + +impl EventKind { + /// Short lowercase tag (matches CLI JSON `kind` field). + pub fn as_str(&self) -> &'static str { + match self { + EventKind::Created => "Created", + EventKind::Modified => "Modified", + EventKind::Deleted => "Deleted", + EventKind::Renamed => "Renamed", + } + } +} + +/// Filesystem event emitted by the watcher. +/// +/// `from_path` is `Some(..)` only for `Renamed` events where both endpoints +/// are known at emission time (backend-dependent — see +/// `map::from_notify` for the folding rules). +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Event { + pub kind: EventKind, + pub path: PathBuf, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub from_path: Option, + /// Unix seconds since epoch. + pub timestamp: i64, +} + +impl Event { + /// Construct a new event; timestamp is captured here. + pub fn new(kind: EventKind, path: PathBuf, from_path: Option) -> Self { + Self { + kind, + path, + from_path, + timestamp: unix_now(), + } + } +} + +fn unix_now() -> i64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn kind_as_str_is_stable() { + assert_eq!(EventKind::Created.as_str(), "Created"); + assert_eq!(EventKind::Modified.as_str(), "Modified"); + assert_eq!(EventKind::Deleted.as_str(), "Deleted"); + assert_eq!(EventKind::Renamed.as_str(), "Renamed"); + } + + #[test] + fn event_constructs_with_timestamp() { + let ev = Event::new(EventKind::Created, PathBuf::from("/tmp/x"), None); + assert!(ev.timestamp > 0); + assert_eq!(ev.kind, EventKind::Created); + assert!(ev.from_path.is_none()); + } +} diff --git a/_primitives/_rust/kei-watch/src/lib.rs b/_primitives/_rust/kei-watch/src/lib.rs new file mode 100644 index 0000000..44a03a1 --- /dev/null +++ b/_primitives/_rust/kei-watch/src/lib.rs @@ -0,0 +1,50 @@ +//! kei-watch — filesystem watcher primitive. +//! +//! Thin, synchronous wrapper around the [`notify`] crate. Emits a stable +//! canonical event format so downstream consumers (kei-pipe hot-reload, +//! kei-replay drift detection, dev-loop cache invalidation) don't bind +//! to notify's evolving [`notify::EventKind`] hierarchy. +//! +//! # Surface +//! +//! | Type | Role | +//! |------|------| +//! | [`Watcher`] | owns notify backend + pump thread | +//! | [`Event`] | canonical event ({kind, path, from_path, timestamp}) | +//! | [`EventKind`] | `Created` / `Modified` / `Deleted` / `Renamed` | +//! | [`WatchError`] | failure modes (Io / NotifyBackend / PathNotFound / WatchNotFound) | +//! +//! # Example +//! ```no_run +//! use kei_watch::{Watcher, EventKind}; +//! use std::{path::Path, time::Duration}; +//! +//! let mut w = Watcher::new().unwrap(); +//! w.watch(Path::new("."), true).unwrap(); +//! while let Some(ev) = w.next_event(Duration::from_secs(1)) { +//! if ev.kind == EventKind::Modified { +//! println!("{}", ev.path.display()); +//! } +//! } +//! ``` +//! +//! # Platform notes +//! +//! Rename semantics differ by backend. On macOS/Windows, +//! `RenameMode::Both` is emitted with both endpoints and we populate +//! `from_path`. On Linux (inotify), rename fires as two events +//! (`RenameMode::From` then `RenameMode::To`) correlated by tracker; +//! kei-watch emits each as a separate `Renamed` with `from_path=None`. +//! Downstream code that needs strict from→to pairing should fall back +//! to notify-debouncer-full. + +pub mod debounce; +pub mod error; +pub mod event; +pub mod map; +pub mod pump; +pub mod watcher; + +pub use error::WatchError; +pub use event::{Event, EventKind}; +pub use watcher::Watcher; diff --git a/_primitives/_rust/kei-watch/src/main.rs b/_primitives/_rust/kei-watch/src/main.rs new file mode 100644 index 0000000..1ec948f --- /dev/null +++ b/_primitives/_rust/kei-watch/src/main.rs @@ -0,0 +1,136 @@ +//! kei-watch CLI — streams canonical FS events as JSON Lines. +//! +//! Usage: +//! ```text +//! kei-watch watch --path [--recursive] [--timeout-ms ] +//! ``` +//! +//! Each event is one JSON object per line, flushed per event: +//! `{"kind":"Modified","path":"/abs/path","from":null,"ts":1712345678}`. +//! +//! Exits after `--timeout-ms` of no activity. Without the flag, runs +//! until killed (Ctrl-C). + +use clap::{Parser, Subcommand}; +use kei_watch::{Event, Watcher}; +use std::io::Write; +use std::path::PathBuf; +use std::process::ExitCode; +use std::time::Duration; + +#[derive(Parser)] +#[command(name = "kei-watch", version, about = "Filesystem watcher primitive")] +struct Cli { + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Subcommand)] +enum Cmd { + /// Watch a path and emit JSON-line events to stdout. + Watch { + /// Path to watch (file or directory). + #[arg(long)] + path: PathBuf, + /// Recurse into subdirectories. + #[arg(long)] + recursive: bool, + /// Exit after this many ms without activity. Omit → run forever. + #[arg(long)] + timeout_ms: Option, + }, +} + +fn err(msg: &str) -> ExitCode { + eprintln!("kei-watch: {msg}"); + ExitCode::from(1) +} + +fn event_to_json_line(ev: &Event) -> String { + // Compact, stable shape — not using serde_json::to_string on Event + // because we want `from` (short) rather than `from_path` (long). + let from = match &ev.from_path { + Some(p) => serde_json::Value::String(p.to_string_lossy().into_owned()), + None => serde_json::Value::Null, + }; + let obj = serde_json::json!({ + "kind": ev.kind.as_str(), + "path": ev.path.to_string_lossy(), + "from": from, + "ts": ev.timestamp, + }); + obj.to_string() +} + +fn emit_event(ev: &Event) -> std::io::Result<()> { + let line = event_to_json_line(ev); + let stdout = std::io::stdout(); + let mut lock = stdout.lock(); + writeln!(lock, "{line}")?; + lock.flush() +} + +fn run_watch(path: PathBuf, recursive: bool, timeout_ms: Option) -> ExitCode { + let mut watcher = match Watcher::new() { + Ok(w) => w, + Err(e) => return err(&format!("new: {e}")), + }; + if let Err(e) = watcher.watch(&path, recursive) { + return err(&format!("watch {}: {e}", path.display())); + } + let step = Duration::from_millis(500); + let limit = timeout_ms.unwrap_or(u64::MAX); + let mut idle_ms: u64 = 0; + loop { + match watcher.next_event(step) { + Some(ev) => { + if emit_event(&ev).is_err() { + return ExitCode::SUCCESS; + } + idle_ms = 0; + } + None => { + idle_ms = idle_ms.saturating_add(step.as_millis() as u64); + if idle_ms >= limit { + return ExitCode::SUCCESS; + } + } + } + } +} + +fn main() -> ExitCode { + let cli = Cli::parse(); + match cli.cmd { + Cmd::Watch { path, recursive, timeout_ms } => run_watch(path, recursive, timeout_ms), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use kei_watch::EventKind; + use std::path::PathBuf; + + #[test] + fn json_line_has_required_fields() { + let ev = Event::new(EventKind::Modified, PathBuf::from("/x"), None); + let line = event_to_json_line(&ev); + assert!(line.contains("\"kind\":\"Modified\"")); + assert!(line.contains("\"path\":\"/x\"")); + assert!(line.contains("\"from\":null")); + assert!(line.contains("\"ts\":")); + } + + #[test] + fn json_line_includes_from_when_renamed() { + let ev = Event::new( + EventKind::Renamed, + PathBuf::from("/b"), + Some(PathBuf::from("/a")), + ); + let line = event_to_json_line(&ev); + assert!(line.contains("\"from\":\"/a\"")); + assert!(line.contains("\"path\":\"/b\"")); + } +} diff --git a/_primitives/_rust/kei-watch/src/map.rs b/_primitives/_rust/kei-watch/src/map.rs new file mode 100644 index 0000000..da2db05 --- /dev/null +++ b/_primitives/_rust/kei-watch/src/map.rs @@ -0,0 +1,123 @@ +//! Mapping: `notify::Event` → zero or more canonical [`Event`]. +//! +//! Folding rules: +//! - `Create(*)` → `EventKind::Created` +//! - `Modify(Data*)` / `Modify(Any)` / `Modify(Other)` → `EventKind::Modified` +//! - `Remove(*)` → `EventKind::Deleted` +//! - `Modify(Name(*))` → `EventKind::Renamed` (from_path populated if both +//! endpoints present in `paths`; else None) +//! - `Access(*)` / `Modify(Metadata(*))` / `Other` / `Any` → SKIP +//! +//! Rationale: Access events fire constantly on macOS fsevents and are +//! rarely what a hot-reload / drift-detection consumer wants. Metadata +//! changes (mtime-only touch) are likewise noise. + +use crate::event::{Event, EventKind}; +use notify::event::{EventKind as NK, ModifyKind, RenameMode}; + +/// Convert one `notify::Event` into 0..N canonical [`Event`]s. +/// +/// Returns `Vec` because a single notify event may carry multiple paths +/// (primarily for `Rename::Both`, which we still emit as a single event +/// with `from_path` populated — but we fold multi-path Create/Remove +/// sensibly too, one emitted event per path). +pub fn from_notify(ev: ¬ify::Event) -> Vec { + match ev.kind { + NK::Create(_) => fan_out(EventKind::Created, ev), + NK::Remove(_) => fan_out(EventKind::Deleted, ev), + NK::Modify(ModifyKind::Name(rm)) => rename(rm, ev), + NK::Modify(ModifyKind::Data(_)) + | NK::Modify(ModifyKind::Any) + | NK::Modify(ModifyKind::Other) => fan_out(EventKind::Modified, ev), + // Skip: Access, Modify(Metadata(*)), Other, Any. + _ => Vec::new(), + } +} + +/// Emit one canonical event per path in `ev.paths`. +fn fan_out(kind: EventKind, ev: ¬ify::Event) -> Vec { + ev.paths + .iter() + .map(|p| Event::new(kind, p.clone(), None)) + .collect() +} + +/// Rename mapping. `RenameMode::Both` carries `[from, to]` in paths; +/// other modes may carry only a single path (backend-dependent — see +/// crate-level docs). Callers receive partial information on those. +fn rename(mode: RenameMode, ev: ¬ify::Event) -> Vec { + match mode { + RenameMode::Both if ev.paths.len() >= 2 => { + let from = ev.paths[0].clone(); + let to = ev.paths[1].clone(); + vec![Event::new(EventKind::Renamed, to, Some(from))] + } + // RenameMode::To: path is the destination; no `from` known here. + // RenameMode::From: path is the origin; this event effectively + // says "this path moved away" — we surface it as Renamed with + // only `path` populated (no destination known yet). + // RenameMode::Any / Other: same — emit whatever path we have. + _ => ev + .paths + .iter() + .map(|p| Event::new(EventKind::Renamed, p.clone(), None)) + .collect(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use notify::event::{AccessKind, CreateKind, MetadataKind, RemoveKind}; + use std::path::PathBuf; + + fn nev(kind: NK, paths: Vec) -> notify::Event { + let mut e = notify::Event::new(kind); + e.paths = paths; + e + } + + #[test] + fn create_maps_to_created() { + let e = nev(NK::Create(CreateKind::File), vec!["/a".into()]); + let out = from_notify(&e); + assert_eq!(out.len(), 1); + assert_eq!(out[0].kind, EventKind::Created); + } + + #[test] + fn access_is_skipped() { + let e = nev(NK::Access(AccessKind::Read), vec!["/a".into()]); + assert!(from_notify(&e).is_empty()); + } + + #[test] + fn metadata_is_skipped() { + let e = nev( + NK::Modify(ModifyKind::Metadata(MetadataKind::AccessTime)), + vec!["/a".into()], + ); + assert!(from_notify(&e).is_empty()); + } + + #[test] + fn rename_both_populates_from_path() { + let e = nev( + NK::Modify(ModifyKind::Name(RenameMode::Both)), + vec!["/a".into(), "/b".into()], + ); + let out = from_notify(&e); + assert_eq!(out.len(), 1); + assert_eq!(out[0].kind, EventKind::Renamed); + assert_eq!(out[0].path, PathBuf::from("/b")); + assert_eq!(out[0].from_path, Some(PathBuf::from("/a"))); + } + + #[test] + fn remove_maps_to_deleted() { + let e = nev(NK::Remove(RemoveKind::File), vec!["/a".into()]); + let out = from_notify(&e); + assert_eq!(out.len(), 1); + assert_eq!(out[0].kind, EventKind::Deleted); + } +} diff --git a/_primitives/_rust/kei-watch/src/pump.rs b/_primitives/_rust/kei-watch/src/pump.rs new file mode 100644 index 0000000..fb2036e --- /dev/null +++ b/_primitives/_rust/kei-watch/src/pump.rs @@ -0,0 +1,72 @@ +//! Single-threaded pump: reads `Result` from notify's +//! channel, maps + debounces, pushes canonical [`Event`] to the output +//! channel consumed by `next_event` / `drain`. +//! +//! Exactly one thread is spawned per [`crate::Watcher`] instance. The +//! thread exits cleanly when notify's sender is dropped (closing the +//! input channel), which happens when the `notify::RecommendedWatcher` +//! is dropped inside [`crate::Watcher::drop`]. + +use crate::debounce::Debouncer; +use crate::event::Event; +use crate::map; +use std::sync::mpsc::{Receiver, Sender}; +use std::thread::{self, JoinHandle}; + +/// Spawn the pump thread. +/// +/// `notify_rx` — source end of notify's event channel. +/// `out_tx` — destination channel; each accepted canonical event is +/// forwarded here. +/// Returns the thread handle so the watcher can join on drop. +pub fn spawn( + notify_rx: Receiver>, + out_tx: Sender, +) -> JoinHandle<()> { + thread::Builder::new() + .name("kei-watch-pump".into()) + .spawn(move || run(notify_rx, out_tx)) + .expect("spawn kei-watch pump thread") +} + +fn run(notify_rx: Receiver>, out_tx: Sender) { + let mut deb = Debouncer::new(); + while let Ok(res) = notify_rx.recv() { + let Ok(notify_ev) = res else { continue }; + for canon in map::from_notify(¬ify_ev) { + if !deb.accept(&canon) { + continue; + } + // out_tx dropped (watcher released) → exit cleanly. + if out_tx.send(canon).is_err() { + return; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::EventKind; + use notify::event::{CreateKind, EventKind as NK}; + use std::path::PathBuf; + use std::sync::mpsc; + use std::time::Duration; + + #[test] + fn pump_forwards_canonical_event() { + let (n_tx, n_rx) = mpsc::channel::>(); + let (o_tx, o_rx) = mpsc::channel::(); + let h = spawn(n_rx, o_tx); + + let mut e = notify::Event::new(NK::Create(CreateKind::File)); + e.paths = vec![PathBuf::from("/tmp/x")]; + n_tx.send(Ok(e)).unwrap(); + drop(n_tx); + + let ev = o_rx.recv_timeout(Duration::from_millis(500)).unwrap(); + assert_eq!(ev.kind, EventKind::Created); + h.join().unwrap(); + } +} diff --git a/_primitives/_rust/kei-watch/src/watcher.rs b/_primitives/_rust/kei-watch/src/watcher.rs new file mode 100644 index 0000000..58b100d --- /dev/null +++ b/_primitives/_rust/kei-watch/src/watcher.rs @@ -0,0 +1,114 @@ +//! Public [`Watcher`] type — owns the notify backend and the pump thread. +//! +//! Layout invariants: +//! - exactly one pump thread per watcher +//! - pump thread exits when `notify::Watcher` is dropped (closes +//! notify's sender, which closes the pump's `recv`) +//! - `Drop` explicitly drops the notify watcher first, then joins the +//! pump — preventing zombie threads + +use crate::error::{from_notify, WatchError}; +use crate::event::Event; +use crate::pump; +use notify::{RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use std::path::Path; +use std::sync::mpsc::{self, Receiver}; +use std::thread::JoinHandle; +use std::time::Duration; + +/// Filesystem watcher. Synchronous API only; see crate docs. +pub struct Watcher { + inner: Option, + out_rx: Receiver, + pump_handle: Option>, +} + +impl Watcher { + /// Construct a new watcher. Spawns one background thread for the + /// event pump and initialises the notify backend with its own + /// internal thread(s). + pub fn new() -> Result { + let (n_tx, n_rx) = mpsc::channel::>(); + let (o_tx, o_rx) = mpsc::channel::(); + let inner = notify::recommended_watcher(n_tx).map_err(from_notify)?; + let pump_handle = pump::spawn(n_rx, o_tx); + Ok(Self { + inner: Some(inner), + out_rx: o_rx, + pump_handle: Some(pump_handle), + }) + } + + /// Begin watching `path`. When `recursive=true`, all descendants + /// are watched too; otherwise only the path itself (and its + /// immediate children if a directory). + pub fn watch(&mut self, path: &Path, recursive: bool) -> Result<(), WatchError> { + let mode = if recursive { + RecursiveMode::Recursive + } else { + RecursiveMode::NonRecursive + }; + let inner = self.inner.as_mut().expect("watcher initialised"); + inner.watch(path, mode).map_err(from_notify) + } + + /// Stop watching `path`. + pub fn unwatch(&mut self, path: &Path) -> Result<(), WatchError> { + let inner = self.inner.as_mut().expect("watcher initialised"); + inner.unwatch(path).map_err(from_notify) + } + + /// Block until either an event arrives or `timeout` elapses. + /// Returns `None` on timeout or channel closure. + pub fn next_event(&self, timeout: Duration) -> Option { + self.out_rx.recv_timeout(timeout).ok() + } + + /// Non-blocking: drain all currently queued events. + pub fn drain(&self) -> Vec { + let mut out = Vec::new(); + while let Ok(ev) = self.out_rx.try_recv() { + out.push(ev); + } + out + } +} + +impl Drop for Watcher { + fn drop(&mut self) { + // Dropping the notify watcher closes the pump's input channel; + // the pump loop exits, and we can join its thread cleanly. + drop(self.inner.take()); + if let Some(h) = self.pump_handle.take() { + let _ = h.join(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn new_and_drop_is_clean() { + let w = Watcher::new().unwrap(); + drop(w); + } + + #[test] + fn watch_missing_path_is_error() { + let mut w = Watcher::new().unwrap(); + let r = w.watch(Path::new("/definitely/does/not/exist/kei-watch"), false); + assert!(r.is_err()); + } + + #[test] + fn drain_on_idle_is_empty() { + let d = tempdir().unwrap(); + let mut w = Watcher::new().unwrap(); + w.watch(d.path(), false).unwrap(); + // No activity → drain returns empty. + assert!(w.drain().is_empty()); + } +} diff --git a/_primitives/_rust/kei-watch/tests/common/mod.rs b/_primitives/_rust/kei-watch/tests/common/mod.rs new file mode 100644 index 0000000..13f487a --- /dev/null +++ b/_primitives/_rust/kei-watch/tests/common/mod.rs @@ -0,0 +1,54 @@ +//! Shared helpers for the integration tests. +//! +//! `tests/` files are separate crates; common code lives under +//! `tests/common/mod.rs` per cargo convention (not a top-level +//! `tests/common.rs`, which would itself be a test binary). + +use kei_watch::Watcher; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +pub const EVENT_TIMEOUT: Duration = Duration::from_secs(3); + +/// Pull events until one matches `pred` or the global timeout elapses. +pub fn wait_for bool>( + w: &Watcher, + pred: F, +) -> Option { + let deadline = Instant::now() + EVENT_TIMEOUT; + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let step = std::cmp::min(remaining, Duration::from_millis(200)); + if let Some(ev) = w.next_event(step) { + if pred(&ev) { + return Some(ev); + } + } + } + None +} + +/// macOS reports paths under `/private/var/...`; tempdirs live at `/var/...`. +/// Canonicalise both sides before compare. When the target file has been +/// deleted, `canonicalize` fails — fall back to symlink-free parent match. +pub fn same_path(a: &Path, b: &Path) -> bool { + if a == b { + return true; + } + let ca = fs::canonicalize(a).ok(); + let cb = fs::canonicalize(b).ok(); + match (ca, cb) { + (Some(x), Some(y)) => x == y, + _ => canonicalize_parent(a) == canonicalize_parent(b), + } +} + +fn canonicalize_parent(p: &Path) -> PathBuf { + let parent = p.parent().and_then(|q| fs::canonicalize(q).ok()); + let file = p.file_name().map(|s| s.to_owned()); + match (parent, file) { + (Some(par), Some(f)) => par.join(f), + _ => p.to_path_buf(), + } +} diff --git a/_primitives/_rust/kei-watch/tests/watch_flow.rs b/_primitives/_rust/kei-watch/tests/watch_flow.rs new file mode 100644 index 0000000..b9a1a88 --- /dev/null +++ b/_primitives/_rust/kei-watch/tests/watch_flow.rs @@ -0,0 +1,91 @@ +//! Flow-control integration tests: rename semantics, `drain` behaviour, +//! `next_event` timeout, `unwatch`. + +mod common; + +use common::{same_path, wait_for}; +use kei_watch::{EventKind, Watcher}; +use std::fs; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tempfile::tempdir; + +#[test] +fn rename_file_emits_renamed() { + // Platform-flexible: macOS fsevents emits Modify(Name(Both)) with + // both paths; Linux inotify emits Modify(Name(From)) + + // Modify(Name(To)) as two events. The test accepts any Renamed + // referencing either endpoint. + let d = tempdir().expect("tempdir"); + let from = d.path().join("src.txt"); + let to = d.path().join("dst.txt"); + fs::write(&from, b"x").expect("seed"); + + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(150)); + + fs::rename(&from, &to).expect("rename"); + + let got = wait_for(&w, |e| { + e.kind == EventKind::Renamed + && (same_path(&e.path, &from) + || same_path(&e.path, &to) + || e.from_path + .as_ref() + .is_some_and(|p| same_path(p, &from))) + }); + assert!(got.is_some(), "expected any Renamed event referencing src/dst"); +} + +#[test] +fn drain_is_non_blocking() { + let d = tempdir().expect("tempdir"); + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + let start = Instant::now(); + let out = w.drain(); + assert!(start.elapsed() < Duration::from_millis(100)); + assert!(out.is_empty()); +} + +#[test] +fn next_event_times_out_on_idle() { + let d = tempdir().expect("tempdir"); + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(100)); + let _ = w.drain(); + let start = Instant::now(); + let ev = w.next_event(Duration::from_millis(200)); + let elapsed = start.elapsed(); + assert!(ev.is_none(), "expected None on idle, got {ev:?}"); + assert!( + elapsed >= Duration::from_millis(150), + "next_event returned too fast: {elapsed:?}" + ); +} + +#[test] +fn unwatch_stops_events() { + let d = tempdir().expect("tempdir"); + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(100)); + w.unwatch(d.path()).expect("unwatch"); + sleep(Duration::from_millis(100)); + let _ = w.drain(); + + fs::write(d.path().join("ghost.txt"), b"boo").unwrap(); + sleep(Duration::from_millis(400)); + + let after: Vec<_> = w + .drain() + .into_iter() + .filter(|e| same_path(&e.path, &d.path().join("ghost.txt"))) + .collect(); + assert!( + after.is_empty(), + "expected zero events after unwatch, got {after:?}" + ); +} diff --git a/_primitives/_rust/kei-watch/tests/watch_smoke.rs b/_primitives/_rust/kei-watch/tests/watch_smoke.rs new file mode 100644 index 0000000..f2a88c6 --- /dev/null +++ b/_primitives/_rust/kei-watch/tests/watch_smoke.rs @@ -0,0 +1,116 @@ +//! Smoke-level integration tests: construction, error paths, +//! create / modify / delete events, debounce behaviour. +//! +//! Rename + flow-control tests live in `tests/watch_flow.rs`. + +mod common; + +use common::{same_path, wait_for}; +use kei_watch::{EventKind, Watcher}; +use std::fs; +use std::io::Write; +use std::path::Path; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tempfile::tempdir; + +#[test] +fn new_and_drop_does_not_panic() { + let w = Watcher::new().expect("new"); + drop(w); +} + +#[test] +fn watch_nonexistent_path_returns_error() { + let mut w = Watcher::new().expect("new"); + let bogus = Path::new("/definitely/does/not/exist/kei-watch-test-xxx"); + assert!(w.watch(bogus, false).is_err()); +} + +#[test] +fn create_file_emits_created() { + let d = tempdir().expect("tempdir"); + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(100)); + + let f = d.path().join("new.txt"); + fs::write(&f, b"hello").expect("write"); + + let got = wait_for(&w, |e| { + e.kind == EventKind::Created && same_path(&e.path, &f) + }); + assert!(got.is_some(), "expected Created for {}", f.display()); +} + +#[test] +fn modify_file_emits_modified() { + let d = tempdir().expect("tempdir"); + let f = d.path().join("m.txt"); + fs::write(&f, b"v1").expect("seed"); + + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(150)); + + let mut fh = fs::OpenOptions::new().append(true).open(&f).unwrap(); + fh.write_all(b"v2").unwrap(); + fh.flush().unwrap(); + drop(fh); + + let got = wait_for(&w, |e| { + e.kind == EventKind::Modified && same_path(&e.path, &f) + }); + assert!(got.is_some(), "expected Modified for {}", f.display()); +} + +#[test] +fn delete_file_emits_deleted() { + let d = tempdir().expect("tempdir"); + let f = d.path().join("del.txt"); + fs::write(&f, b"doomed").expect("seed"); + // Let the seed-create event flush before the watcher starts. + sleep(Duration::from_millis(100)); + + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(200)); + let _ = w.drain(); + + fs::remove_file(&f).expect("remove"); + + let got = wait_for(&w, |e| { + e.kind == EventKind::Deleted && same_path(&e.path, &f) + }); + assert!(got.is_some(), "expected Deleted for {}", f.display()); +} + +#[test] +fn rapid_modifies_are_debounced() { + let d = tempdir().expect("tempdir"); + let f = d.path().join("burst.txt"); + fs::write(&f, b"seed").expect("seed"); + + let mut w = Watcher::new().expect("new"); + w.watch(d.path(), true).expect("watch"); + sleep(Duration::from_millis(150)); + + let start = Instant::now(); + for i in 0..5 { + fs::write(&f, format!("v{i}")).unwrap(); + } + assert!(start.elapsed() < Duration::from_millis(50)); + + sleep(Duration::from_millis(300)); + let drained: Vec<_> = w + .drain() + .into_iter() + .filter(|e| e.kind == EventKind::Modified && same_path(&e.path, &f)) + .collect(); + assert!( + drained.len() <= 2, + "expected ≤2 Modified events after debounce, got {}: {:?}", + drained.len(), + drained + ); +}