feat(wave13): kei-diff + kei-scheduler + kei-watch primitives

3 new primitives, registered in workspace members (39 crates total):

- kei-diff (9 files, 27 tests): Structural JSON diff RFC 6902 subset
  (add/remove/replace). Pure computation. Index-based array diff
  (not LCS) matches drift-detection semantics. Round-trip property
  verified on 15+ fixtures. Zero sibling deps — pure utility.

- kei-scheduler (12 files, 16 tests): Durable task scheduler (cron /
  at / interval) primitive. Engine-native (SCHEDULER_SCHEMA on
  kei-entity-store). Name-unique via custom migration. compute_next
  pure fn + CLI tick for external executor.

- kei-watch (12 files, 30 tests): Filesystem watcher thin wrapper
  around notify 8.x. Sync API (no tokio). 50ms debounce. Cross-platform
  rename handling (macOS Modify(Name(Both)) vs Linux From/To pair).

All crate-local [workspace] tables removed. Registered in
_primitives/_rust/Cargo.toml. cargo check --workspace clean.

Constructor Pattern: all source files <=200 LOC, all functions <=30 LOC.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 15:51:42 +08:00
parent 770402ac7a
commit d521e7d89a
35 changed files with 3168 additions and 14 deletions

View file

@ -656,6 +656,12 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.11.1"
@ -912,6 +918,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cron"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740"
dependencies = [
"chrono",
"once_cell",
"winnow 0.6.26",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@ -1182,6 +1199,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures-channel"
version = "0.3.32"
@ -1307,7 +1333,7 @@ version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"libc",
"libgit2-sys",
"log",
@ -1770,6 +1796,26 @@ dependencies = [
"serde_core",
]
[[package]]
name = "inotify"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199"
dependencies = [
"bitflags 2.11.1",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.12.0"
@ -2014,6 +2060,14 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-diff"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "kei-entity-store"
version = "0.1.0"
@ -2189,6 +2243,22 @@ dependencies = [
"walkdir",
]
[[package]]
name = "kei-scheduler"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"cron",
"kei-entity-store",
"rusqlite",
"serde",
"serde_json",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "kei-search-core"
version = "0.1.0"
@ -2261,6 +2331,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "kei-watch"
version = "0.1.0"
dependencies = [
"clap",
"notify",
"serde",
"serde_json",
"tempfile",
]
[[package]]
name = "keisei"
version = "0.1.0"
@ -2276,6 +2357,26 @@ dependencies = [
"toml",
]
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -2321,7 +2422,7 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"libc",
"plain",
"redox_syscall 0.7.4",
@ -2437,6 +2538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.61.2",
]
@ -2480,6 +2582,33 @@ dependencies = [
"memchr",
]
[[package]]
name = "notify"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.11.1",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.60.2",
]
[[package]]
name = "notify-types"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a"
dependencies = [
"bitflags 2.11.1",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@ -2716,7 +2845,7 @@ version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60769b8b31b2a9f263dae2776c37b1b28ae246943cf719eb6946a1db05128a61"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"crc32fast",
"fdeflate",
"flate2",
@ -2858,7 +2987,7 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
"bitflags 2.11.1",
]
[[package]]
@ -2867,7 +2996,7 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a"
dependencies = [
"bitflags",
"bitflags 2.11.1",
]
[[package]]
@ -2945,7 +3074,7 @@ version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
@ -2968,7 +3097,7 @@ version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"errno",
"libc",
"linux-raw-sys",
@ -3105,7 +3234,7 @@ version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"core-foundation",
"core-foundation-sys",
"libc",
@ -3475,7 +3604,7 @@ checksum = "5afe4c38a9b417b6a9a5eeffe7235d0a106716495536e7727d1c7f4b1ff3eba6"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags",
"bitflags 2.11.1",
"byteorder",
"bytes",
"crc",
@ -3517,7 +3646,7 @@ checksum = "b1dbb157e65f10dbe01f729339c06d239120221c9ad9fa0ba8408c4cc18ecf21"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags",
"bitflags 2.11.1",
"byteorder",
"crc",
"dotenvy",
@ -3867,7 +3996,7 @@ dependencies = [
"serde_spanned",
"toml_datetime",
"toml_write",
"winnow",
"winnow 0.7.15",
]
[[package]]
@ -4220,7 +4349,7 @@ version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags",
"bitflags 2.11.1",
"hashbrown 0.15.5",
"indexmap",
"semver",
@ -4328,6 +4457,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.5",
]
[[package]]
name = "windows-sys"
version = "0.61.2"
@ -4361,13 +4499,30 @@ dependencies = [
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm",
"windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.53.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
"windows_i686_gnullvm 0.53.1",
"windows_i686_msvc 0.53.1",
"windows_x86_64_gnu 0.53.1",
"windows_x86_64_gnullvm 0.53.1",
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
@ -4380,6 +4535,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
@ -4392,6 +4553,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_aarch64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
@ -4404,12 +4571,24 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
@ -4422,6 +4601,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_i686_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
@ -4434,6 +4619,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
@ -4446,6 +4637,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
@ -4458,6 +4655,21 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "windows_x86_64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "winnow"
version = "0.6.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.7.15"
@ -4531,7 +4743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags",
"bitflags 2.11.1",
"indexmap",
"log",
"serde",

View file

@ -51,6 +51,12 @@ members = [
"kei-spawn",
# agent substrate v1 — reconstruct spawn from DNA (ledger row + task.toml + recompose)
"kei-replay",
# v0.29 Wave 13 — structural JSON diff primitive (RFC 6902 subset add/remove/replace)
"kei-diff",
# v0.29 Wave 13 — durable task scheduler (cron / at / interval) metadata primitive
"kei-scheduler",
# v0.29 Wave 13 — filesystem watcher primitive (thin notify wrapper, sync API)
"kei-watch",
]
[workspace.package]

View file

@ -0,0 +1,18 @@
[package]
name = "kei-diff"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Structural JSON diff (RFC 6902 subset: add/remove/replace). Pure computation primitive for drift detection in kei-replay and invalidation in kei-cache."
[lib]
name = "kei_diff"
path = "src/lib.rs"
[[bin]]
name = "kei-diff"
path = "src/main.rs"
[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }

View file

@ -0,0 +1,169 @@
//! Apply an RFC 6902 patch (add/remove/replace subset) to a JSON document.
//!
//! Root-path `""` replace swaps the entire document. Array `add` with
//! index == len (or `-`) appends; in-range index inserts and shifts.
//! Array `remove` deletes and shifts. Object ops insert/delete/replace keys.
use crate::apply_error::ApplyError;
use crate::op::{Op, Patch};
use crate::path::parse_pointer;
use serde_json::Value;
/// Apply `patch` to `root` and return a new `Value`. `root` is cloned;
/// the original is untouched. Operations are applied in order.
pub fn apply(root: &Value, patch: &Patch) -> Result<Value, ApplyError> {
let mut doc = root.clone();
for op in &patch.0 {
apply_one(&mut doc, op)?;
}
Ok(doc)
}
fn apply_one(doc: &mut Value, op: &Op) -> Result<(), ApplyError> {
match op {
Op::Add { path, value } => apply_add(doc, path, value.clone()),
Op::Remove { path } => apply_remove(doc, path).map(|_| ()),
Op::Replace { path, value } => apply_replace(doc, path, value.clone()),
}
}
fn apply_add(doc: &mut Value, path: &str, value: Value) -> Result<(), ApplyError> {
let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?;
if segs.is_empty() {
return Err(ApplyError::CannotAddToRoot);
}
let (parent_segs, last) = segs.split_at(segs.len() - 1);
let parent = navigate_mut(doc, parent_segs, path)?;
insert_into(parent, &last[0], value, path)
}
fn apply_remove(doc: &mut Value, path: &str) -> Result<Value, ApplyError> {
let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?;
if segs.is_empty() {
return Err(ApplyError::CannotRemoveRoot);
}
let (parent_segs, last) = segs.split_at(segs.len() - 1);
let parent = navigate_mut(doc, parent_segs, path)?;
remove_from(parent, &last[0], path)
}
fn apply_replace(doc: &mut Value, path: &str, value: Value) -> Result<(), ApplyError> {
let segs = parse_pointer(path).ok_or_else(|| ApplyError::InvalidPointer(path.into()))?;
if segs.is_empty() {
*doc = value;
return Ok(());
}
let (parent_segs, last) = segs.split_at(segs.len() - 1);
let parent = navigate_mut(doc, parent_segs, path)?;
replace_in(parent, &last[0], value, path)
}
fn navigate_mut<'a>(
mut cur: &'a mut Value,
segs: &[String],
full_path: &str,
) -> Result<&'a mut Value, ApplyError> {
for seg in segs {
cur = step_into(cur, seg, full_path)?;
}
Ok(cur)
}
fn step_into<'a>(
cur: &'a mut Value,
seg: &str,
full_path: &str,
) -> Result<&'a mut Value, ApplyError> {
match cur {
Value::Object(map) => map
.get_mut(seg)
.ok_or_else(|| ApplyError::MissingParent(full_path.into())),
Value::Array(arr) => {
let idx = parse_index(seg, full_path)?;
arr.get_mut(idx)
.ok_or_else(|| ApplyError::MissingParent(full_path.into()))
}
_ => Err(ApplyError::TypeMismatch {
path: full_path.into(),
expected: "object or array",
}),
}
}
fn insert_into(parent: &mut Value, key: &str, value: Value, full: &str) -> Result<(), ApplyError> {
match parent {
Value::Object(map) => {
map.insert(key.to_string(), value);
Ok(())
}
Value::Array(arr) => {
let idx = parse_array_insert_index(key, arr.len(), full)?;
arr.insert(idx, value);
Ok(())
}
_ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }),
}
}
fn remove_from(parent: &mut Value, key: &str, full: &str) -> Result<Value, ApplyError> {
match parent {
Value::Object(map) => map
.remove(key)
.ok_or_else(|| ApplyError::MissingTarget(full.into())),
Value::Array(arr) => {
let idx = parse_index(key, full)?;
if idx >= arr.len() {
return Err(ApplyError::IndexOutOfBounds {
path: full.into(),
index: idx,
len: arr.len(),
});
}
Ok(arr.remove(idx))
}
_ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }),
}
}
fn replace_in(parent: &mut Value, key: &str, value: Value, full: &str) -> Result<(), ApplyError> {
match parent {
Value::Object(map) => {
if !map.contains_key(key) {
return Err(ApplyError::MissingTarget(full.into()));
}
map.insert(key.to_string(), value);
Ok(())
}
Value::Array(arr) => {
let idx = parse_index(key, full)?;
if idx >= arr.len() {
return Err(ApplyError::IndexOutOfBounds {
path: full.into(),
index: idx,
len: arr.len(),
});
}
arr[idx] = value;
Ok(())
}
_ => Err(ApplyError::TypeMismatch { path: full.into(), expected: "object or array" }),
}
}
fn parse_index(seg: &str, full: &str) -> Result<usize, ApplyError> {
seg.parse::<usize>()
.map_err(|_| ApplyError::InvalidPointer(full.into()))
}
fn parse_array_insert_index(seg: &str, len: usize, full: &str) -> Result<usize, ApplyError> {
if seg == "-" {
return Ok(len);
}
let idx = seg
.parse::<usize>()
.map_err(|_| ApplyError::InvalidPointer(full.into()))?;
if idx > len {
return Err(ApplyError::IndexOutOfBounds { path: full.into(), index: idx, len });
}
Ok(idx)
}

View file

@ -0,0 +1,37 @@
//! `ApplyError` — structured failure reasons for `apply()`.
//!
//! Kept in its own module so `apply.rs` stays focused on the algorithm
//! and each file stays within Constructor Pattern limits.
use std::fmt;
#[derive(Debug, Clone, PartialEq)]
pub enum ApplyError {
InvalidPointer(String),
MissingParent(String),
MissingTarget(String),
IndexOutOfBounds { path: String, index: usize, len: usize },
TypeMismatch { path: String, expected: &'static str },
CannotAddToRoot,
CannotRemoveRoot,
}
impl fmt::Display for ApplyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidPointer(p) => write!(f, "invalid JSON pointer: {p:?}"),
Self::MissingParent(p) => write!(f, "missing parent at {p:?}"),
Self::MissingTarget(p) => write!(f, "missing target at {p:?}"),
Self::IndexOutOfBounds { path, index, len } => {
write!(f, "index {index} out of bounds (len {len}) at {path:?}")
}
Self::TypeMismatch { path, expected } => {
write!(f, "type mismatch at {path:?}: expected {expected}")
}
Self::CannotAddToRoot => write!(f, "cannot 'add' to root (use 'replace')"),
Self::CannotRemoveRoot => write!(f, "cannot 'remove' root"),
}
}
}
impl std::error::Error for ApplyError {}

View file

@ -0,0 +1,106 @@
//! Structural JSON diff.
//!
//! Algorithm:
//! * Both objects → recurse per-key across the union (add/remove/recurse).
//! * Both arrays → index-based (recurse on overlap; add-tail or remove-tail
//! for length delta). NOT LCS — simpler, idempotent enough for drift
//! detection, and cheap (O(n)).
//! * Otherwise, if values differ → `replace`.
//! * Equal values → no-op.
//!
//! Rationale for skipping LCS: the consumer (kei-replay drift check) cares
//! about "does anything differ" and "at which logical coordinate", not
//! minimum-edit-distance. Index-based gives stable paths; LCS would produce
//! a smaller patch on shuffled arrays but with ambiguous paths.
use crate::op::{Op, Patch};
use crate::path::PathBuf;
use serde_json::Value;
/// Compute an RFC 6902 subset patch that transforms `old` into `new`.
/// Invariant: `apply(old, diff(old, new)) == new`.
pub fn diff(old: &Value, new: &Value) -> Patch {
let mut patch = Patch::new();
let mut path = PathBuf::new();
diff_recurse(old, new, &mut path, &mut patch);
patch
}
fn diff_recurse(old: &Value, new: &Value, path: &mut PathBuf, patch: &mut Patch) {
if old == new {
return;
}
match (old, new) {
(Value::Object(a), Value::Object(b)) => diff_objects(a, b, path, patch),
(Value::Array(a), Value::Array(b)) => diff_arrays(a, b, path, patch),
_ => patch.push(Op::Replace {
path: path.as_string(),
value: new.clone(),
}),
}
}
fn diff_objects(
a: &serde_json::Map<String, Value>,
b: &serde_json::Map<String, Value>,
path: &mut PathBuf,
patch: &mut Patch,
) {
// Removals: keys in `a` but not `b`. Emit in stable key order for determinism.
for key in a.keys() {
if !b.contains_key(key) {
path.push_key(key);
patch.push(Op::Remove { path: path.as_string() });
path.pop();
}
}
// Additions + recursion: iterate `b` in its key order.
for (key, b_val) in b {
path.push_key(key);
match a.get(key) {
None => patch.push(Op::Add {
path: path.as_string(),
value: b_val.clone(),
}),
Some(a_val) => diff_recurse(a_val, b_val, path, patch),
}
path.pop();
}
}
fn diff_arrays(a: &[Value], b: &[Value], path: &mut PathBuf, patch: &mut Patch) {
let common = a.len().min(b.len());
// Recurse on overlapping prefix.
for i in 0..common {
path.push_index(i);
diff_recurse(&a[i], &b[i], path, patch);
path.pop();
}
if a.len() > b.len() {
emit_array_truncate(a.len(), b.len(), path, patch);
} else if b.len() > a.len() {
emit_array_append(b, a.len(), path, patch);
}
}
// Remove trailing indices highest-first so surviving indices don't shift.
fn emit_array_truncate(old_len: usize, new_len: usize, path: &mut PathBuf, patch: &mut Patch) {
for i in (new_len..old_len).rev() {
path.push_index(i);
patch.push(Op::Remove { path: path.as_string() });
path.pop();
}
}
// Append new tail. Emit in ascending order so each add references the
// just-created length as the next insertion point.
fn emit_array_append(b: &[Value], old_len: usize, path: &mut PathBuf, patch: &mut Patch) {
for i in old_len..b.len() {
path.push_index(i);
patch.push(Op::Add {
path: path.as_string(),
value: b[i].clone(),
});
path.pop();
}
}

View file

@ -0,0 +1,22 @@
//! kei-diff — structural JSON diff (RFC 6902 subset: add/remove/replace).
//!
//! ## Design
//! * Emits ONLY `add`, `remove`, `replace`. No `copy`/`move`/`test`.
//! * Arrays diffed by index (not LCS) — matches drift-detection semantics.
//! * Paths are RFC 6901 JSON Pointers (`~` → `~0`, `/` → `~1`).
//! * Correctness invariant: `apply(old, diff(old, new)) == new`.
//!
//! Consumed by `kei-replay` (drift detection between DNA-scoped agent runs)
//! and `kei-cache` (invalidation signals). Pure compute, zero sibling deps.
mod apply;
mod apply_error;
mod diff;
mod op;
mod path;
pub use apply::apply;
pub use apply_error::ApplyError;
pub use diff::diff;
pub use op::{Op, Patch};
pub use path::PathBuf as PointerBuf;

View file

@ -0,0 +1,80 @@
//! kei-diff CLI.
//!
//! Usage:
//! kei-diff diff --old <path> --new <path> # prints RFC 6902 patch
//! kei-diff apply --base <path> --patch <path> # prints result document
//!
//! No external arg-parser dep — this is a two-verb tool with fixed flag sets,
//! hand-rolling keeps the crate zero-dep beyond serde/serde_json.
use std::process::ExitCode;
fn main() -> ExitCode {
let args: Vec<String> = std::env::args().skip(1).collect();
match run(&args) {
Ok(out) => {
println!("{out}");
ExitCode::SUCCESS
}
Err(e) => {
eprintln!("kei-diff: {e}");
ExitCode::FAILURE
}
}
}
fn run(args: &[String]) -> Result<String, String> {
match args.first().map(String::as_str) {
Some("diff") => cmd_diff(&args[1..]),
Some("apply") => cmd_apply(&args[1..]),
Some("help") | Some("--help") | Some("-h") | None => Ok(usage()),
Some(other) => Err(format!("unknown subcommand {other:?}\n{}", usage())),
}
}
fn cmd_diff(args: &[String]) -> Result<String, String> {
let old_path = flag(args, "--old")?;
let new_path = flag(args, "--new")?;
let old = read_json(&old_path)?;
let new = read_json(&new_path)?;
let patch = kei_diff::diff(&old, &new);
serde_json::to_string_pretty(&patch).map_err(|e| e.to_string())
}
fn cmd_apply(args: &[String]) -> Result<String, String> {
let base_path = flag(args, "--base")?;
let patch_path = flag(args, "--patch")?;
let base = read_json(&base_path)?;
let patch_json = std::fs::read_to_string(&patch_path)
.map_err(|e| format!("read {patch_path}: {e}"))?;
let patch: kei_diff::Patch = serde_json::from_str(&patch_json)
.map_err(|e| format!("parse patch: {e}"))?;
let out = kei_diff::apply(&base, &patch).map_err(|e| e.to_string())?;
serde_json::to_string_pretty(&out).map_err(|e| e.to_string())
}
fn flag(args: &[String], name: &str) -> Result<String, String> {
let mut iter = args.iter();
while let Some(a) = iter.next() {
if a == name {
return iter
.next()
.cloned()
.ok_or_else(|| format!("flag {name} requires a value"));
}
}
Err(format!("missing required flag {name}"))
}
fn read_json(path: &str) -> Result<serde_json::Value, String> {
let txt = std::fs::read_to_string(path).map_err(|e| format!("read {path}: {e}"))?;
serde_json::from_str(&txt).map_err(|e| format!("parse {path}: {e}"))
}
fn usage() -> String {
"kei-diff — structural JSON diff (RFC 6902 add/remove/replace)\n\n\
USAGE:\n \
kei-diff diff --old <file> --new <file>\n \
kei-diff apply --base <file> --patch <file>\n"
.to_string()
}

View file

@ -0,0 +1,134 @@
//! Patch operation types + RFC 6902 JSON serialization.
//!
//! We emit only the minimal trio (`add`, `remove`, `replace`). Custom Serialize
//! keeps the wire format stable and self-documenting (no need for serde tag
//! gymnastics).
use serde::de::{self, MapAccess, Visitor};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use std::fmt;
/// A single RFC 6902 patch operation (subset).
#[derive(Debug, Clone, PartialEq)]
pub enum Op {
Add { path: String, value: Value },
Remove { path: String },
Replace { path: String, value: Value },
}
/// An ordered list of `Op`s. Serializes as a JSON array per RFC 6902.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Patch(pub Vec<Op>);
impl Patch {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn push(&mut self, op: Op) {
self.0.push(op);
}
pub fn len(&self) -> usize {
self.0.len()
}
}
impl Serialize for Op {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match self {
Op::Add { path, value } => {
let mut m = s.serialize_map(Some(3))?;
m.serialize_entry("op", "add")?;
m.serialize_entry("path", path)?;
m.serialize_entry("value", value)?;
m.end()
}
Op::Remove { path } => {
let mut m = s.serialize_map(Some(2))?;
m.serialize_entry("op", "remove")?;
m.serialize_entry("path", path)?;
m.end()
}
Op::Replace { path, value } => {
let mut m = s.serialize_map(Some(3))?;
m.serialize_entry("op", "replace")?;
m.serialize_entry("path", path)?;
m.serialize_entry("value", value)?;
m.end()
}
}
}
}
impl Serialize for Patch {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let mut seq = s.serialize_seq(Some(self.0.len()))?;
for op in &self.0 {
seq.serialize_element(op)?;
}
seq.end()
}
}
impl<'de> Deserialize<'de> for Op {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
d.deserialize_map(OpVisitor)
}
}
impl<'de> Deserialize<'de> for Patch {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
Vec::<Op>::deserialize(d).map(Patch)
}
}
struct OpVisitor;
impl<'de> Visitor<'de> for OpVisitor {
type Value = Op;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("an RFC 6902 JSON Patch operation object")
}
fn visit_map<M: MapAccess<'de>>(self, mut map: M) -> Result<Op, M::Error> {
let mut op: Option<String> = None;
let mut path: Option<String> = None;
let mut value: Option<Value> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"op" => op = Some(map.next_value()?),
"path" => path = Some(map.next_value()?),
"value" => value = Some(map.next_value()?),
_ => {
let _: serde::de::IgnoredAny = map.next_value()?;
}
}
}
let op = op.ok_or_else(|| de::Error::missing_field("op"))?;
let path = path.ok_or_else(|| de::Error::missing_field("path"))?;
build_op::<M::Error>(&op, path, value)
}
}
fn build_op<E: de::Error>(op: &str, path: String, value: Option<Value>) -> Result<Op, E> {
match op {
"add" => Ok(Op::Add {
path,
value: value.ok_or_else(|| E::missing_field("value"))?,
}),
"remove" => Ok(Op::Remove { path }),
"replace" => Ok(Op::Replace {
path,
value: value.ok_or_else(|| E::missing_field("value"))?,
}),
other => Err(E::unknown_variant(other, &["add", "remove", "replace"])),
}
}

View file

@ -0,0 +1,105 @@
//! RFC 6901 JSON Pointer path builder.
//!
//! Root is `""`. Segments join with `/`. Inside a segment, `~` encodes as
//! `~0` and `/` encodes as `~1`. Order matters: `~` must be escaped first
//! when encoding, and `~1` must be decoded before `~0`.
/// Incremental pointer builder. Use `push`/`pop` during recursive traversal;
/// `as_str` yields the current RFC 6901 pointer.
#[derive(Debug, Default, Clone)]
pub struct PathBuf {
segments: Vec<String>, // already-encoded segments (no leading '/')
}
impl PathBuf {
pub fn new() -> Self {
Self { segments: Vec::new() }
}
/// Push an object key. Performs RFC 6901 escaping.
pub fn push_key(&mut self, key: &str) {
self.segments.push(encode_segment(key));
}
/// Push an array index. Always emitted as decimal digits.
pub fn push_index(&mut self, idx: usize) {
self.segments.push(idx.to_string());
}
pub fn pop(&mut self) {
self.segments.pop();
}
/// Current pointer as a String. Empty string if at root.
pub fn as_string(&self) -> String {
if self.segments.is_empty() {
return String::new();
}
let mut out = String::with_capacity(self.segments.iter().map(|s| s.len() + 1).sum());
for seg in &self.segments {
out.push('/');
out.push_str(seg);
}
out
}
}
fn encode_segment(raw: &str) -> String {
// ~ must be escaped BEFORE / so we don't double-encode.
raw.replace('~', "~0").replace('/', "~1")
}
/// Parse an RFC 6901 pointer into decoded segments. `""` → `[]`.
/// Returns `None` if pointer is malformed (e.g. doesn't start with `/`
/// and is non-empty).
pub fn parse_pointer(ptr: &str) -> Option<Vec<String>> {
if ptr.is_empty() {
return Some(Vec::new());
}
if !ptr.starts_with('/') {
return None;
}
let segs = ptr[1..]
.split('/')
.map(decode_segment)
.collect::<Vec<_>>();
Some(segs)
}
fn decode_segment(raw: &str) -> String {
// ~1 must be decoded BEFORE ~0 per RFC 6901.
raw.replace("~1", "/").replace("~0", "~")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn root_is_empty_string() {
let p = PathBuf::new();
assert_eq!(p.as_string(), "");
}
#[test]
fn escape_tilde_and_slash() {
let mut p = PathBuf::new();
p.push_key("a/b");
p.push_key("c~d");
assert_eq!(p.as_string(), "/a~1b/c~0d");
}
#[test]
fn roundtrip_encode_decode() {
let mut p = PathBuf::new();
p.push_key("weird~/key");
let s = p.as_string();
let decoded = parse_pointer(&s).unwrap();
assert_eq!(decoded, vec!["weird~/key".to_string()]);
}
#[test]
fn parse_rejects_malformed() {
assert!(parse_pointer("no-leading-slash").is_none());
}
}

View file

@ -0,0 +1,278 @@
//! Integration tests for kei-diff.
//!
//! Core property: `apply(old, diff(old, new)) == new` for every fixture.
//! Plus edge cases on pointer escaping, array edits, apply errors, and
//! the RFC 6902 wire format.
use kei_diff::{apply, diff, ApplyError, Op, Patch};
use serde_json::{json, Value};
fn rt(old: Value, new: Value) {
let patch = diff(&old, &new);
let applied = apply(&old, &patch).expect("apply failed");
assert_eq!(
applied, new,
"round-trip failed\n old = {old}\n new = {new}\n patch = {}",
serde_json::to_string(&patch).unwrap()
);
}
#[test]
fn equal_values_produce_empty_patch() {
let patch = diff(&json!({"a": 1}), &json!({"a": 1}));
assert!(patch.is_empty());
rt(json!({"a": 1}), json!({"a": 1}));
}
#[test]
fn scalar_replace() {
rt(json!(1), json!(2));
rt(json!("a"), json!("b"));
rt(json!(true), json!(false));
}
#[test]
fn type_change_emits_replace() {
let old = json!("hello");
let new = json!(42);
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(patch.0[0], Op::Replace { .. }));
rt(old, new);
}
#[test]
fn object_add_key() {
let old = json!({"a": 1});
let new = json!({"a": 1, "b": 2});
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Add { path, .. } if path == "/b"));
rt(old, new);
}
#[test]
fn object_remove_key() {
let old = json!({"a": 1, "b": 2});
let new = json!({"a": 1});
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Remove { path } if path == "/b"));
rt(old, new);
}
#[test]
fn object_replace_value() {
let old = json!({"a": 1});
let new = json!({"a": 2});
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/a"));
rt(old, new);
}
#[test]
fn nested_object_replace() {
let old = json!({"a": {"b": {"c": 1}}});
let new = json!({"a": {"b": {"c": 2}}});
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/a/b/c"));
rt(old, new);
}
#[test]
fn array_append() {
let old = json!([1, 2]);
let new = json!([1, 2, 3, 4]);
let patch = diff(&old, &new);
assert_eq!(patch.len(), 2);
assert!(matches!(&patch.0[0], Op::Add { path, .. } if path == "/2"));
assert!(matches!(&patch.0[1], Op::Add { path, .. } if path == "/3"));
rt(old, new);
}
#[test]
fn array_truncate() {
let old = json!([1, 2, 3, 4]);
let new = json!([1, 2]);
let patch = diff(&old, &new);
// Expect removals highest-first (/3 then /2) so indices stay valid.
assert_eq!(patch.len(), 2);
assert!(matches!(&patch.0[0], Op::Remove { path } if path == "/3"));
assert!(matches!(&patch.0[1], Op::Remove { path } if path == "/2"));
rt(old, new);
}
#[test]
fn array_element_replace() {
let old = json!([1, 2, 3]);
let new = json!([1, 99, 3]);
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/1"));
rt(old, new);
}
#[test]
fn nested_array_inside_object() {
let old = json!({"xs": [1, 2, 3], "y": "z"});
let new = json!({"xs": [1, 7, 3, 4], "y": "z"});
rt(old, new);
}
#[test]
fn deeply_nested_mixed() {
let old = json!({
"meta": {"ts": 100, "tags": ["a", "b"]},
"items": [{"id": 1}, {"id": 2}],
});
let new = json!({
"meta": {"ts": 200, "tags": ["a", "b", "c"]},
"items": [{"id": 1}, {"id": 3}, {"id": 4}],
"extra": true,
});
rt(old, new);
}
#[test]
fn pointer_escapes_slash_and_tilde() {
let old = json!({"a/b": 1, "c~d": 2});
let new = json!({"a/b": 9, "c~d": 2});
let patch = diff(&old, &new);
assert_eq!(patch.len(), 1);
let expected_path = "/a~1b";
match &patch.0[0] {
Op::Replace { path, .. } => assert_eq!(path, expected_path),
other => panic!("expected Replace, got {other:?}"),
}
rt(old, new);
let old2 = json!({"c~d": 1});
let new2 = json!({"c~d": 2});
let p2 = diff(&old2, &new2);
match &p2.0[0] {
Op::Replace { path, .. } => assert_eq!(path, "/c~0d"),
other => panic!("expected Replace, got {other:?}"),
}
rt(old2, new2);
}
#[test]
fn apply_missing_path_errors() {
let doc = json!({"a": 1});
let patch = Patch(vec![Op::Replace {
path: "/nope".into(),
value: json!(9),
}]);
let err = apply(&doc, &patch).unwrap_err();
assert!(matches!(err, ApplyError::MissingTarget(_)));
}
#[test]
fn apply_remove_missing_errors() {
let doc = json!({"a": 1});
let patch = Patch(vec![Op::Remove { path: "/ghost".into() }]);
assert!(matches!(
apply(&doc, &patch).unwrap_err(),
ApplyError::MissingTarget(_)
));
}
#[test]
fn apply_replace_root() {
let doc = json!({"a": 1});
let patch = Patch(vec![Op::Replace {
path: "".into(),
value: json!([1, 2, 3]),
}]);
let out = apply(&doc, &patch).unwrap();
assert_eq!(out, json!([1, 2, 3]));
}
#[test]
fn empty_patch_is_identity() {
let doc = json!({"a": [1, 2, {"b": true}]});
let out = apply(&doc, &Patch::new()).unwrap();
assert_eq!(out, doc);
}
#[test]
fn apply_add_on_root_errors() {
let doc = json!({"a": 1});
let patch = Patch(vec![Op::Add {
path: "".into(),
value: json!(2),
}]);
assert!(matches!(
apply(&doc, &patch).unwrap_err(),
ApplyError::CannotAddToRoot
));
}
#[test]
fn apply_remove_root_errors() {
let doc = json!([1, 2]);
let patch = Patch(vec![Op::Remove { path: "".into() }]);
assert!(matches!(
apply(&doc, &patch).unwrap_err(),
ApplyError::CannotRemoveRoot
));
}
#[test]
fn wire_format_matches_rfc_6902() {
let patch = Patch(vec![
Op::Add {
path: "/x".into(),
value: json!(1),
},
Op::Remove { path: "/y".into() },
Op::Replace {
path: "/z".into(),
value: json!("hi"),
},
]);
let txt = serde_json::to_string(&patch).unwrap();
let parsed: Value = serde_json::from_str(&txt).unwrap();
assert_eq!(
parsed,
json!([
{"op": "add", "path": "/x", "value": 1},
{"op": "remove", "path": "/y"},
{"op": "replace", "path": "/z", "value": "hi"},
])
);
}
#[test]
fn patch_roundtrip_through_serde() {
let p1 = Patch(vec![
Op::Add {
path: "/a".into(),
value: json!({"nested": [1, 2]}),
},
Op::Remove { path: "/b/0".into() },
]);
let txt = serde_json::to_string(&p1).unwrap();
let p2: Patch = serde_json::from_str(&txt).unwrap();
assert_eq!(p1, p2);
}
#[test]
fn array_of_objects_element_replace() {
let old = json!([{"id": 1, "v": "a"}, {"id": 2, "v": "b"}]);
let new = json!([{"id": 1, "v": "a"}, {"id": 2, "v": "z"}]);
let patch = diff(&old, &new);
// Should be a single deep replace at /1/v
assert_eq!(patch.len(), 1);
assert!(matches!(&patch.0[0], Op::Replace { path, .. } if path == "/1/v"));
rt(old, new);
}
#[test]
fn null_to_value_is_replace() {
rt(json!(null), json!({"x": 1}));
rt(json!({"x": null}), json!({"x": 1}));
rt(json!({"x": 1}), json!({"x": null}));
}

View file

@ -0,0 +1,34 @@
[package]
name = "kei-scheduler"
version = "0.1.0"
edition = "2021"
rust-version = "1.75"
description = "Durable task scheduler (cron / at / interval) — metadata primitive."
[package.metadata.keisei]
backend = "sqlite"
db_env = "KEI_SCHEDULER_DB"
db_default = "~/.claude/scheduler/scheduler.sqlite"
schema_version = 1
[[bin]]
name = "kei-scheduler"
path = "src/main.rs"
[lib]
name = "kei_scheduler"
path = "src/lib.rs"
[dependencies]
kei-entity-store = { path = "../kei-entity-store" }
rusqlite = { version = "0.31", features = ["bundled"] }
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"
thiserror = "1"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
cron = "0.15"
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,51 @@
//! Typed errors for kei-scheduler. `Error` is the public wrapper;
//! `ParseError` surfaces trigger-spec parse failures separately so
//! callers (and tests) can discriminate without string-matching.
use thiserror::Error;
/// Trigger-spec parse failures. Pure function — no DB contact.
#[derive(Debug, Error, PartialEq, Eq)]
pub enum ParseError {
#[error("unknown trigger_kind '{0}' — expected cron / at / interval")]
UnknownKind(String),
#[error("invalid cron expression '{0}': {1}")]
InvalidCron(String, String),
#[error("invalid ISO-8601 datetime '{0}' — expected YYYY-MM-DDTHH:MM:SSZ")]
InvalidIsoDatetime(String),
#[error("invalid interval '{0}' — expected positive integer seconds")]
InvalidInterval(String),
}
/// Public scheduler error. Wraps rusqlite + anyhow + ParseError.
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Parse(#[from] ParseError),
#[error("sqlite: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("task not found: id={0}")]
NotFound(i64),
#[error("task name already exists: '{0}'")]
NameExists(String),
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Error {
/// Inspect a rusqlite error and reclassify `UNIQUE constraint
/// failed: scheduler_tasks.name` into a typed `NameExists`. Other
/// SQLite errors pass through unchanged.
pub fn from_insert(err: rusqlite::Error, name: &str) -> Self {
let msg = err.to_string();
if msg.contains("UNIQUE constraint failed")
&& msg.contains("scheduler_tasks.name")
{
Self::NameExists(name.to_string())
} else {
Self::Sqlite(err)
}
}
}

View file

@ -0,0 +1,51 @@
//! kei-scheduler — durable task scheduler primitive (cron / at /
//! interval triggers). Metadata store only; execution is the caller's
//! responsibility. `kei-pipe` or a cron-wrapper agent pumps
//! `list_due` → invoke → `mark_run` on an external cadence.
//!
//! Shape mirrors the sibling kei-task / kei-chat-store pattern: the
//! `kei-entity-store` engine owns DDL + migrations, and this crate adds
//! the scheduler-specific SQL helpers (`schedule`, `cancel`, `list_due`,
//! `mark_run`) on top of its `Store` shim.
//!
//! Public API surface (all I/O is synchronous rusqlite; no runtime):
//! - [`open`] / [`open_memory`] — build a `Store` with the scheduler schema.
//! - [`schedule`] — insert a new task + pre-compute `next_run_at`.
//! - [`cancel`] — set status=cancelled, clear `next_run_at`.
//! - [`list_due`] — rows where `next_run_at <= now` AND status is
//! pending/scheduled.
//! - [`mark_run`] — stamp last_run / last_exit_code / advance schedule.
//! - [`compute_next`] — pure function, no DB.
pub mod error;
pub mod query;
pub mod run;
pub mod schedule;
pub mod schema;
pub mod store;
pub mod task;
pub mod trigger;
pub use error::{Error, ParseError};
pub use query::{get_by_name, get_task, list_due};
pub use run::mark_run;
pub use schedule::{cancel, schedule};
pub use schema::{ALL_SCHEMAS, SCHEDULER_SCHEMA};
pub use store::Store;
pub use task::{status as task_status, Task};
pub use trigger::{compute_next, validate_kind, AT, CRON, INTERVAL};
use std::path::Path;
/// Convenience constructor — opens the scheduler DB at `path`, creating
/// parent dirs + running migrations. Wraps `Store::open` so callers who
/// only need the raw Store don't import the submodule.
pub fn open(path: &Path) -> anyhow::Result<Store> {
Store::open(path)
}
/// In-memory scheduler — used by unit tests and by callers who want a
/// throwaway queue (e.g. a dry-run planner).
pub fn open_memory() -> anyhow::Result<Store> {
Store::open_memory()
}

View file

@ -0,0 +1,141 @@
//! kei-scheduler CLI — schedule / cancel / list-due / mark-run / tick.
//!
//! Exit-code contract:
//! - 0 — success
//! - 1 — IO / storage / usage
//! - 2 — validation (bad trigger kind / spec / unknown id)
use chrono::Utc;
use clap::{Parser, Subcommand};
use kei_scheduler::{
cancel, get_task, list_due, mark_run, schedule, Error, Store,
};
use std::path::PathBuf;
use std::process::ExitCode;
struct CliError {
code: u8,
msg: String,
}
impl CliError {
fn io(msg: impl Into<String>) -> Self { Self { code: 1, msg: msg.into() } }
fn validation(msg: impl Into<String>) -> Self { Self { code: 2, msg: msg.into() } }
}
impl From<anyhow::Error> for CliError {
fn from(e: anyhow::Error) -> Self { Self::io(format!("{e:#}")) }
}
impl From<Error> for CliError {
fn from(e: Error) -> Self {
match &e {
Error::Parse(_) | Error::NotFound(_) | Error::NameExists(_) =>
Self::validation(format!("{e}")),
_ => Self::io(format!("{e}")),
}
}
}
#[derive(Parser)]
#[command(name = "kei-scheduler", version, about = "Durable task scheduler (cron/at/interval)")]
struct Cli {
#[arg(long)]
db: Option<PathBuf>,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Insert a new scheduled task.
Schedule {
#[arg(long)] name: String,
#[arg(long)] kind: String,
#[arg(long)] spec: String,
#[arg(long)] cmd: String,
},
/// Cancel a task by id.
Cancel { #[arg(long)] id: i64 },
/// Print due tasks as a JSON array (reads `now = Utc::now`).
ListDue,
/// Record a run's exit code and advance next_run_at.
MarkRun {
#[arg(long)] id: i64,
#[arg(long)] exit: i64,
},
/// Convenience: `list-due` for the current wall clock.
Tick,
/// Print one task as JSON.
Get { #[arg(long)] id: i64 },
}
fn db_path(cli_db: Option<PathBuf>) -> PathBuf {
if let Some(p) = cli_db { return p; }
if let Ok(e) = std::env::var("KEI_SCHEDULER_DB") { return PathBuf::from(e); }
let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
PathBuf::from(home).join(".claude/scheduler/scheduler.sqlite")
}
fn run() -> Result<(), CliError> {
let cli = Cli::parse();
let store = Store::open(&db_path(cli.db))?;
dispatch(&store, cli.cmd)
}
fn dispatch(store: &Store, cmd: Cmd) -> Result<(), CliError> {
match cmd {
Cmd::Schedule { name, kind, spec, cmd } =>
cmd_schedule(store, &name, &kind, &spec, &cmd),
Cmd::Cancel { id } => { cancel(store.conn(), id)?; println!("cancelled {id}"); Ok(()) }
Cmd::ListDue | Cmd::Tick => cmd_list_due(store),
Cmd::MarkRun { id, exit } => {
mark_run(store.conn(), id, exit, Utc::now().timestamp())?;
println!("marked run {id} exit={exit}");
Ok(())
}
Cmd::Get { id } => cmd_get(store, id),
}
}
fn cmd_schedule(
store: &Store,
name: &str,
kind: &str,
spec: &str,
cmd: &str,
) -> Result<(), CliError> {
let id = schedule(store.conn(), name, kind, spec, cmd)?;
println!("{id}");
Ok(())
}
fn cmd_list_due(store: &Store) -> Result<(), CliError> {
let now = Utc::now().timestamp();
let rows = list_due(store.conn(), now)?;
let json = serde_json::to_string_pretty(&rows).map_err(|e| CliError::io(e.to_string()))?;
println!("{json}");
Ok(())
}
fn cmd_get(store: &Store, id: i64) -> Result<(), CliError> {
match get_task(store.conn(), id)? {
Some(t) => {
let json = serde_json::to_string_pretty(&t)
.map_err(|e| CliError::io(e.to_string()))?;
println!("{json}");
Ok(())
}
None => Err(CliError::validation(format!("task not found: id={id}"))),
}
}
fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Err(CliError { code, msg }) => {
eprintln!("{msg}");
ExitCode::from(code)
}
}
}

View file

@ -0,0 +1,62 @@
//! Read-side queries: `list_due` and `get_task`.
//!
//! `list_due` is the hot path driven by external tickers (`kei-pipe`,
//! cron-wrapper agents). It selects rows whose `next_run_at <= now` AND
//! `status IN (pending, scheduled)` AND `next_run_at IS NOT NULL`. The
//! NOT-NULL filter excludes cancelled + one-shot-completed rows.
use crate::error::Error;
use crate::task::{status, Task, SELECT_COLS};
use rusqlite::{params, Connection};
/// Fetch all rows whose `next_run_at <= now` and status makes them
/// eligible to run. Ordered by `next_run_at ASC` so the earliest-due
/// task surfaces first.
pub fn list_due(conn: &Connection, now: i64) -> Result<Vec<Task>, Error> {
let sql = format!(
"SELECT {cols} FROM scheduler_tasks \
WHERE next_run_at IS NOT NULL \
AND next_run_at <= ?1 \
AND status IN (?2, ?3) \
ORDER BY next_run_at ASC, id ASC",
cols = SELECT_COLS,
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt
.query_map(params![now, status::PENDING, status::SCHEDULED], Task::from_row)?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
Ok(out)
}
/// Fetch a single task by id. `Ok(None)` if no such row.
pub fn get_task(conn: &Connection, id: i64) -> Result<Option<Task>, Error> {
let sql = format!(
"SELECT {cols} FROM scheduler_tasks WHERE id = ?1",
cols = SELECT_COLS,
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query_map(params![id], Task::from_row)?;
match rows.next() {
Some(r) => Ok(Some(r?)),
None => Ok(None),
}
}
/// Fetch a single task by unique name. Used by the CLI's `cancel --name`
/// convenience; kept thin so the query-layer responsibilities stay in
/// one module.
pub fn get_by_name(conn: &Connection, name: &str) -> Result<Option<Task>, Error> {
let sql = format!(
"SELECT {cols} FROM scheduler_tasks WHERE name = ?1",
cols = SELECT_COLS,
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query_map(params![name], Task::from_row)?;
match rows.next() {
Some(r) => Ok(Some(r?)),
None => Ok(None),
}
}

View file

@ -0,0 +1,86 @@
//! `mark_run` — record completion of a triggered execution.
//!
//! Caller supplies `now` explicitly so tests are deterministic. The new
//! `next_run_at` is re-computed from `now` using the task's stored
//! trigger_kind / trigger_spec:
//! - `interval` → `now + secs` (never terminal).
//! - `cron` → next cron occurrence after `now` (falls back to
//! terminal `done` if no future occurrence exists).
//! - `at` → one-shot; status → `done`, next_run_at → NULL.
//!
//! Status transitions: cancelled rows are immutable (function returns
//! `Error::NotFound` to keep the surface minimal — the caller should
//! not be marking runs on cancelled tasks).
use crate::error::Error;
use crate::query::get_task;
use crate::task::status;
use crate::trigger::{compute_next, AT};
use rusqlite::{params, Connection};
/// Record a run outcome and advance the schedule.
///
/// Returns `Ok(())` on success, `Error::NotFound` if `id` doesn't
/// exist or refers to a cancelled task, `Error::Parse` if the stored
/// trigger spec is no longer parseable (should not happen if the row
/// was created via `schedule`).
pub fn mark_run(
conn: &Connection,
id: i64,
exit_code: i64,
now: i64,
) -> Result<(), Error> {
let task = match get_task(conn, id)? {
Some(t) if t.status == status::CANCELLED => return Err(Error::NotFound(id)),
Some(t) => t,
None => return Err(Error::NotFound(id)),
};
let (next, next_status) = advance(&task.trigger_kind, &task.trigger_spec, exit_code, now)?;
write_run(conn, id, exit_code, now, next, next_status)
}
/// Compute the next `(next_run_at, status)` pair given the trigger +
/// run outcome. Exit-code 0 on `at` → `done`; non-zero → `failed`.
/// `cron`/`interval` ignore exit code when scheduling next fire.
fn advance(
kind: &str,
spec: &str,
exit_code: i64,
now: i64,
) -> Result<(Option<i64>, &'static str), Error> {
if kind == AT {
let s = if exit_code == 0 { status::DONE } else { status::FAILED };
return Ok((None, s));
}
let next = compute_next(kind, spec, now)?;
let status_next = match next {
Some(_) => status::SCHEDULED,
// Cron schedule with no future occurrence — treat as terminal.
None => status::DONE,
};
Ok((next, status_next))
}
fn write_run(
conn: &Connection,
id: i64,
exit_code: i64,
now: i64,
next_run_at: Option<i64>,
next_status: &str,
) -> Result<(), Error> {
let rows = conn.execute(
"UPDATE scheduler_tasks SET \
last_run_at = ?1, \
last_exit_code = ?2, \
next_run_at = ?3, \
status = ?4, \
updated_at = ?5 \
WHERE id = ?6",
params![now, exit_code, next_run_at, next_status, now, id],
)?;
if rows == 0 {
return Err(Error::NotFound(id));
}
Ok(())
}

View file

@ -0,0 +1,70 @@
//! `schedule` + `cancel` operations. INSERT / UPDATE on the
//! `scheduler_tasks` table with trigger validation and initial
//! `next_run_at` computed from `compute_next`.
use crate::error::Error;
use crate::task::status;
use crate::trigger::{compute_next, validate_kind};
use chrono::Utc;
use rusqlite::{params, Connection};
/// Insert a new task row. Validates the trigger spec and pre-computes
/// `next_run_at` from `now = Utc::now().timestamp()`.
///
/// Returns the new row's `id`. Errors:
/// - `Error::Parse(...)` — invalid kind / spec
/// - `Error::NameExists(name)` — name UNIQUE violation
/// - `Error::Sqlite(...)` — other DB failures
pub fn schedule(
conn: &Connection,
name: &str,
trigger_kind: &str,
trigger_spec: &str,
command: &str,
) -> Result<i64, Error> {
let kind = validate_kind(trigger_kind)?;
let now = Utc::now().timestamp();
let next = compute_next(kind, trigger_spec, now)?;
insert_row(conn, name, kind, trigger_spec, command, next, now)
}
fn insert_row(
conn: &Connection,
name: &str,
kind: &str,
spec: &str,
command: &str,
next_run_at: Option<i64>,
now: i64,
) -> Result<i64, Error> {
let sql = "INSERT INTO scheduler_tasks \
(name, trigger_kind, trigger_spec, command, status, \
last_run_at, next_run_at, last_exit_code, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6, NULL, ?7, ?7)";
let status = status::PENDING;
let result = conn.execute(
sql,
params![name, kind, spec, command, status, next_run_at, now],
);
match result {
Ok(_) => Ok(conn.last_insert_rowid()),
Err(e) => Err(Error::from_insert(e, name)),
}
}
/// Mark a task cancelled. Clears `next_run_at` so it cannot match
/// `list_due` again even if somebody re-activates the row manually.
/// Idempotent: cancelling an already-cancelled task is a no-op.
/// Missing id → `Error::NotFound`.
pub fn cancel(conn: &Connection, id: i64) -> Result<(), Error> {
let now = Utc::now().timestamp();
let rows = conn.execute(
"UPDATE scheduler_tasks SET status = ?1, next_run_at = NULL, updated_at = ?2 \
WHERE id = ?3",
params![status::CANCELLED, now, id],
)?;
if rows == 0 {
return Err(Error::NotFound(id));
}
Ok(())
}

View file

@ -0,0 +1,52 @@
//! kei-scheduler EntitySchema — declarative spec consumed by
//! `kei_entity_store::Store`.
//!
//! Schema matches the sibling pattern (kei-task, kei-chat-store): one
//! primary table with standard CRUD fields plus scheduler-specific
//! trigger + run-tracking columns. The `name` UNIQUE constraint rides
//! the engine's `custom_migrations` slot because `FieldDef` doesn't
//! expose a UNIQUE flag — a unique index on the column provides the
//! same semantics.
use kei_entity_store::schema::{EdgeKeyKind, EntitySchema, FieldDef};
static FIELDS: &[FieldDef] = &[
FieldDef::pk("id"),
FieldDef::text_nn("name"),
FieldDef::text_nn("trigger_kind"),
FieldDef::text_nn("trigger_spec"),
FieldDef::text_nn("command"),
FieldDef::text_default("status", "pending"),
FieldDef::integer("last_run_at"),
FieldDef::integer("next_run_at"),
FieldDef::integer("last_exit_code"),
FieldDef::created_at(),
FieldDef::updated_at(),
];
const DDL_SECONDARY: &str = r#"
CREATE UNIQUE INDEX IF NOT EXISTS idx_scheduler_tasks_name
ON scheduler_tasks(name);
CREATE INDEX IF NOT EXISTS idx_scheduler_tasks_status
ON scheduler_tasks(status);
CREATE INDEX IF NOT EXISTS idx_scheduler_tasks_next_run
ON scheduler_tasks(next_run_at);
"#;
pub static SCHEDULER_SCHEMA: EntitySchema = EntitySchema {
name: "scheduler_task",
table: "scheduler_tasks",
fields: FIELDS,
// Generic verbs are disabled — scheduler-specific ops live in
// `schedule.rs` / `query.rs` / `run.rs`. The engine still owns DDL.
enabled_verbs: &[],
fts_columns: None,
edge_table: None,
edge_key_kind: EdgeKeyKind::IntegerPair,
archived_field: None,
custom_migrations: &[DDL_SECONDARY],
};
/// Full schema list passed to the engine when opening a Store. Declared
/// here so the shim in `store.rs` stays a one-liner.
pub static ALL_SCHEMAS: &[&EntitySchema] = &[&SCHEDULER_SCHEMA];

View file

@ -0,0 +1,32 @@
//! Scheduler store — thin shim over `kei_entity_store::Store`.
//!
//! Mirrors the kei-chat-store pattern: the engine owns DDL + migration
//! transactions, and this crate adds scheduler-specific SQL helpers
//! (`schedule`, `cancel`, `list_due`, `mark_run`) that live in sibling
//! modules.
use crate::schema::ALL_SCHEMAS;
use anyhow::Result;
use kei_entity_store::Store as EntityStore;
use rusqlite::Connection;
use std::path::Path;
pub struct Store {
inner: EntityStore,
}
impl Store {
pub fn open(path: &Path) -> Result<Self> {
let inner = EntityStore::open(path, ALL_SCHEMAS)?;
Ok(Self { inner })
}
pub fn open_memory() -> Result<Self> {
let inner = EntityStore::open_memory(ALL_SCHEMAS)?;
Ok(Self { inner })
}
pub fn conn(&self) -> &Connection {
self.inner.conn()
}
}

View file

@ -0,0 +1,61 @@
//! `Task` — in-memory snapshot of a `scheduler_tasks` row.
//!
//! Serializable for the CLI (`list-due` prints JSON). Status is a plain
//! String so callers can introduce new sentinels without a type bump.
use rusqlite::Row;
use serde::{Deserialize, Serialize};
/// Canonical task status sentinels. Schema default is `pending`;
/// lifecycle: `pending` → `scheduled` (optional staging) → `running` →
/// `done` / `failed`. `cancelled` is terminal and set by `cancel()`.
pub mod status {
pub const PENDING: &str = "pending";
pub const SCHEDULED: &str = "scheduled";
pub const RUNNING: &str = "running";
pub const DONE: &str = "done";
pub const FAILED: &str = "failed";
pub const CANCELLED: &str = "cancelled";
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Task {
pub id: i64,
pub name: String,
pub trigger_kind: String,
pub trigger_spec: String,
pub command: String,
pub status: String,
pub last_run_at: Option<i64>,
pub next_run_at: Option<i64>,
pub last_exit_code: Option<i64>,
pub created_at: i64,
pub updated_at: i64,
}
impl Task {
/// Column order MUST match the SELECT in `query.rs::SELECT_ALL`.
/// rusqlite returns `NULL` for INTEGER columns as 0 unless we read
/// into `Option<i64>` explicitly, which is what we want for the
/// nullable timestamps + exit code.
pub fn from_row(r: &Row) -> rusqlite::Result<Self> {
Ok(Self {
id: r.get(0)?,
name: r.get(1)?,
trigger_kind: r.get(2)?,
trigger_spec: r.get(3)?,
command: r.get(4)?,
status: r.get(5)?,
last_run_at: r.get(6)?,
next_run_at: r.get(7)?,
last_exit_code: r.get(8)?,
created_at: r.get(9)?,
updated_at: r.get(10)?,
})
}
}
/// SELECT column list used by `query.rs` and `run.rs`. Exported so
/// callers building custom queries stay in sync with `Task::from_row`.
pub const SELECT_COLS: &str = "id, name, trigger_kind, trigger_spec, command, status, \
last_run_at, next_run_at, last_exit_code, created_at, updated_at";

View file

@ -0,0 +1,105 @@
//! Trigger parsing + `compute_next` core.
//!
//! Three trigger kinds:
//! - `cron` — 5-field or 6-field cron expression. 5-field inputs get a
//! leading `"0 "` for seconds (standard cron semantics: trigger at
//! the start of the minute).
//! - `at` — one-shot ISO-8601 datetime with trailing `Z` (UTC).
//! - `interval` — repeat every N seconds (positive integer).
//!
//! `compute_next(kind, spec, from)` is pure — no DB, no clock read.
//! Callers pass `from` explicitly so tests are deterministic.
use crate::error::ParseError;
use chrono::{DateTime, TimeZone, Utc};
use cron::Schedule;
use std::str::FromStr;
/// Canonical trigger-kind names. Schema stores these as TEXT; this
/// helper keeps the spelling in one place and rejects anything else at
/// the boundary.
pub const CRON: &str = "cron";
pub const AT: &str = "at";
pub const INTERVAL: &str = "interval";
/// Validate a trigger kind string. Returned as `&'static str` so callers
/// can store the canonical form without allocating.
pub fn validate_kind(kind: &str) -> Result<&'static str, ParseError> {
match kind {
CRON => Ok(CRON),
AT => Ok(AT),
INTERVAL => Ok(INTERVAL),
other => Err(ParseError::UnknownKind(other.to_string())),
}
}
/// Compute the next fire time (unix seconds, UTC) for a trigger given a
/// reference `from` timestamp. Returns `Ok(None)` when no future fire
/// exists (e.g. `at` timestamp already in the past, or cron schedule
/// with no upcoming occurrence in `chrono`'s representable range).
pub fn compute_next(
kind: &str,
spec: &str,
from: i64,
) -> Result<Option<i64>, ParseError> {
match validate_kind(kind)? {
CRON => next_cron(spec, from),
AT => next_at(spec, from),
INTERVAL => next_interval(spec, from),
_ => unreachable!("validate_kind rejects other variants"),
}
}
fn next_cron(spec: &str, from: i64) -> Result<Option<i64>, ParseError> {
let canon = canonicalize_cron(spec);
let sched = Schedule::from_str(&canon)
.map_err(|e| ParseError::InvalidCron(spec.to_string(), e.to_string()))?;
let from_dt = ts_to_utc(from)
.ok_or_else(|| ParseError::InvalidCron(spec.to_string(), "ref-ts overflow".into()))?;
Ok(sched.after(&from_dt).next().map(|dt| dt.timestamp()))
}
/// Accept both classic 5-field (`min hour dom mon dow`) and the
/// cron-crate's 6-field form (`sec min hour dom mon dow`). 7-field
/// expressions (with year) pass through untouched.
fn canonicalize_cron(spec: &str) -> String {
let fields = spec.split_whitespace().count();
if fields == 5 {
format!("0 {}", spec)
} else {
spec.to_string()
}
}
fn next_at(spec: &str, from: i64) -> Result<Option<i64>, ParseError> {
let ts = parse_iso_z(spec)
.ok_or_else(|| ParseError::InvalidIsoDatetime(spec.to_string()))?;
if ts > from {
Ok(Some(ts))
} else {
Ok(None)
}
}
fn next_interval(spec: &str, from: i64) -> Result<Option<i64>, ParseError> {
let secs: i64 = spec
.parse::<u64>()
.map_err(|_| ParseError::InvalidInterval(spec.to_string()))?
.try_into()
.map_err(|_| ParseError::InvalidInterval(spec.to_string()))?;
if secs == 0 {
return Err(ParseError::InvalidInterval(spec.to_string()));
}
Ok(Some(from.saturating_add(secs)))
}
/// Parse `YYYY-MM-DDTHH:MM:SSZ` (no fractional seconds, no offset other
/// than literal `Z`). `DateTime::parse_from_rfc3339` accepts that form.
fn parse_iso_z(spec: &str) -> Option<i64> {
let dt = DateTime::parse_from_rfc3339(spec).ok()?;
Some(dt.with_timezone(&Utc).timestamp())
}
fn ts_to_utc(ts: i64) -> Option<DateTime<Utc>> {
Utc.timestamp_opt(ts, 0).single()
}

View file

@ -0,0 +1,189 @@
//! Integration tests for kei-scheduler. Uses `Store::open_memory` so
//! each test owns a throwaway DB and a deterministic wall clock
//! (`now` passed explicitly where the API allows).
//!
//! `schedule()` + `cancel()` internally read `Utc::now()` once; that's
//! fine because we check relative ordering (`next_run_at` compared to
//! a post-call `Utc::now()` lower bound), not absolute values.
use chrono::Utc;
use kei_scheduler::{
cancel, compute_next, get_task, list_due, mark_run, open_memory, schedule,
task_status, ParseError, Store, AT, CRON, INTERVAL,
};
fn store() -> Store {
open_memory().expect("in-memory store opens clean")
}
#[test]
fn cron_schedule_sets_future_next_run() {
let s = store();
let before = Utc::now().timestamp();
let id = schedule(s.conn(), "cron1", CRON, "*/5 * * * *", "echo").unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.name, "cron1");
assert_eq!(t.trigger_kind, "cron");
assert_eq!(t.status, task_status::PENDING);
let next = t.next_run_at.expect("cron trigger must produce a next_run_at");
assert!(next >= before, "next_run_at {next} must be >= launch time {before}");
}
#[test]
fn at_schedule_with_future_ts_matches_iso_parse() {
let s = store();
// 2030-01-01T00:00:00Z → unix 1893456000 (verified via
// chrono::DateTime::parse_from_rfc3339 at test time).
let id = schedule(s.conn(), "at1", AT, "2030-01-01T00:00:00Z", "echo").unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.next_run_at, Some(1_893_456_000));
assert_eq!(t.trigger_kind, "at");
}
#[test]
fn interval_schedule_sets_now_plus_secs() {
let s = store();
let before = Utc::now().timestamp();
let id = schedule(s.conn(), "int1", INTERVAL, "3600", "echo").unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
let next = t.next_run_at.expect("interval trigger must set next_run_at");
let after = Utc::now().timestamp();
assert!(next >= before + 3600);
assert!(next <= after + 3600);
}
#[test]
fn cancel_sets_status_and_clears_next_run() {
let s = store();
let id = schedule(s.conn(), "tcan", INTERVAL, "60", "echo").unwrap();
cancel(s.conn(), id).unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.status, task_status::CANCELLED);
assert_eq!(t.next_run_at, None);
}
#[test]
fn list_due_returns_eligible_pending_tasks() {
let s = store();
// An interval with spec=60 produces next_run ≈ now+60. Query with
// now+120 to make sure it's due.
schedule(s.conn(), "due1", INTERVAL, "60", "echo").unwrap();
let query_ts = Utc::now().timestamp() + 120;
let due = list_due(s.conn(), query_ts).unwrap();
assert_eq!(due.len(), 1);
assert_eq!(due[0].name, "due1");
assert_eq!(due[0].status, task_status::PENDING);
}
#[test]
fn list_due_excludes_cancelled_tasks() {
let s = store();
let id = schedule(s.conn(), "cx", INTERVAL, "60", "echo").unwrap();
cancel(s.conn(), id).unwrap();
let due = list_due(s.conn(), Utc::now().timestamp() + 100_000).unwrap();
assert!(due.is_empty(), "cancelled tasks must not appear in list_due");
}
#[test]
fn mark_run_on_interval_advances_next_run() {
let s = store();
let id = schedule(s.conn(), "mrint", INTERVAL, "3600", "echo").unwrap();
let now = 2_000_000_000;
mark_run(s.conn(), id, 0, now).unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.last_run_at, Some(now));
assert_eq!(t.last_exit_code, Some(0));
assert_eq!(t.next_run_at, Some(now + 3600));
assert_eq!(t.status, task_status::SCHEDULED);
}
#[test]
fn mark_run_on_at_completes_task() {
let s = store();
let id = schedule(s.conn(), "mrat", AT, "2030-01-01T00:00:00Z", "echo").unwrap();
mark_run(s.conn(), id, 0, 1_893_456_005).unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.status, task_status::DONE);
assert_eq!(t.next_run_at, None);
assert_eq!(t.last_exit_code, Some(0));
}
#[test]
fn mark_run_on_cron_recomputes_next() {
let s = store();
let id = schedule(s.conn(), "mrcron", CRON, "*/5 * * * *", "echo").unwrap();
let now: i64 = 2_000_000_000;
mark_run(s.conn(), id, 0, now).unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
let next = t.next_run_at.expect("cron mark_run must recompute next_run_at");
// `*/5 * * * *` = every 5 minutes at second-0; next must be within
// 300 seconds of `now` and strictly greater.
assert!(next > now);
assert!(next <= now + 300, "next {next} must be within 5m of now {now}");
assert_eq!(t.status, task_status::SCHEDULED);
}
#[test]
fn mark_run_with_nonzero_exit_at_sets_failed() {
let s = store();
let id = schedule(s.conn(), "failat", AT, "2030-06-15T12:00:00Z", "echo").unwrap();
mark_run(s.conn(), id, 17, 1_910_000_000).unwrap();
let t = get_task(s.conn(), id).unwrap().unwrap();
assert_eq!(t.status, task_status::FAILED);
assert_eq!(t.last_exit_code, Some(17));
assert_eq!(t.next_run_at, None);
}
#[test]
fn invalid_cron_returns_parse_error() {
let err = compute_next(CRON, "not a cron expression", 0).unwrap_err();
assert!(
matches!(err, ParseError::InvalidCron(_, _)),
"expected InvalidCron, got {err:?}"
);
}
#[test]
fn invalid_iso_datetime_returns_parse_error() {
let err = compute_next(AT, "not-a-date", 0).unwrap_err();
assert!(
matches!(err, ParseError::InvalidIsoDatetime(_)),
"expected InvalidIsoDatetime, got {err:?}"
);
}
#[test]
fn malformed_trigger_kind_is_rejected() {
let s = store();
let err = schedule(s.conn(), "bad", "weekly", "whatever", "echo")
.expect_err("unknown kind must fail");
assert!(
matches!(err, kei_scheduler::Error::Parse(ParseError::UnknownKind(_))),
"expected UnknownKind, got {err:?}"
);
}
#[test]
fn duplicate_name_is_rejected_typed() {
let s = store();
schedule(s.conn(), "dup", INTERVAL, "60", "echo").unwrap();
let err = schedule(s.conn(), "dup", INTERVAL, "120", "echo")
.expect_err("unique-name collision must fail");
assert!(
matches!(err, kei_scheduler::Error::NameExists(ref n) if n == "dup"),
"expected NameExists(dup), got {err:?}"
);
}
#[test]
fn zero_interval_is_rejected() {
let err = compute_next(INTERVAL, "0", 100).unwrap_err();
assert!(matches!(err, ParseError::InvalidInterval(_)));
}
#[test]
fn at_in_the_past_returns_none() {
// 2020-01-01 with `from = 2030-era` → no future fire.
let next = compute_next(AT, "2020-01-01T00:00:00Z", 1_893_456_000).unwrap();
assert_eq!(next, None);
}

View file

@ -0,0 +1,23 @@
[package]
name = "kei-watch"
version = "0.1.0"
edition = "2021"
rust-version = "1.77"
description = "Filesystem watcher primitive — thin canonical wrapper around notify"
[[bin]]
name = "kei-watch"
path = "src/main.rs"
[lib]
name = "kei_watch"
path = "src/lib.rs"
[dependencies]
notify = "8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
clap = { version = "4", features = ["derive"] }
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,100 @@
//! Coarse debounce: collapse duplicate `(path, kind)` events fired
//! within [`DEBOUNCE_WINDOW`] of the previous one.
//!
//! Intent: swallow FS-level bursts (editor-write double-fire, compiler
//! rewrite patterns). NOT a replacement for notify-debouncer-full — we
//! don't do event reordering or close/write correlation, just per-key
//! rate-limiting.
use crate::event::{Event, EventKind};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
/// Collapse window for duplicate `(path, kind)` pairs.
pub const DEBOUNCE_WINDOW: Duration = Duration::from_millis(50);
/// Per-key last-seen state. Small enough to live in a `HashMap` — pruned
/// opportunistically when entries exceed [`PRUNE_THRESHOLD`] (keeps
/// long-running watchers from growing unboundedly).
pub struct Debouncer {
last_seen: HashMap<(PathBuf, EventKind), Instant>,
}
const PRUNE_THRESHOLD: usize = 1024;
impl Debouncer {
pub fn new() -> Self {
Self { last_seen: HashMap::new() }
}
/// Should this event pass through?
///
/// Returns `true` on first occurrence of `(path, kind)` or if the
/// last occurrence was ≥ `DEBOUNCE_WINDOW` ago. Updates internal
/// state regardless of outcome.
pub fn accept(&mut self, ev: &Event) -> bool {
let key = (ev.path.clone(), ev.kind);
let now = Instant::now();
let decision = match self.last_seen.get(&key) {
Some(&prev) if now.duration_since(prev) < DEBOUNCE_WINDOW => false,
_ => true,
};
self.last_seen.insert(key, now);
if self.last_seen.len() > PRUNE_THRESHOLD {
self.prune(now);
}
decision
}
/// Drop entries older than 10× the debounce window. Called
/// opportunistically when the map grows large.
fn prune(&mut self, now: Instant) {
let cutoff = DEBOUNCE_WINDOW * 10;
self.last_seen.retain(|_, &mut t| now.duration_since(t) < cutoff);
}
}
impl Default for Debouncer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
fn ev(kind: EventKind, path: &str) -> Event {
Event::new(kind, PathBuf::from(path), None)
}
#[test]
fn first_event_passes() {
let mut d = Debouncer::new();
assert!(d.accept(&ev(EventKind::Modified, "/a")));
}
#[test]
fn rapid_duplicate_is_dropped() {
let mut d = Debouncer::new();
assert!(d.accept(&ev(EventKind::Modified, "/a")));
assert!(!d.accept(&ev(EventKind::Modified, "/a")));
}
#[test]
fn different_kind_is_not_debounced() {
let mut d = Debouncer::new();
assert!(d.accept(&ev(EventKind::Modified, "/a")));
assert!(d.accept(&ev(EventKind::Created, "/a")));
}
#[test]
fn after_window_event_passes_again() {
let mut d = Debouncer::new();
assert!(d.accept(&ev(EventKind::Modified, "/a")));
sleep(DEBOUNCE_WINDOW + Duration::from_millis(20));
assert!(d.accept(&ev(EventKind::Modified, "/a")));
}
}

View file

@ -0,0 +1,89 @@
//! Error type for the kei-watch primitive.
//!
//! Wraps `notify` + `io` errors behind a stable surface so downstream
//! consumers don't bind to notify's error hierarchy.
use std::fmt;
use std::path::PathBuf;
/// Failure modes for [`crate::Watcher`] operations.
#[derive(Debug)]
pub enum WatchError {
/// Underlying OS I/O failure.
Io(std::io::Error),
/// notify backend failed to start or watch.
NotifyBackend(String),
/// Path given to `watch` does not exist on disk.
PathNotFound(PathBuf),
/// `unwatch` called on a path that was never registered.
WatchNotFound(PathBuf),
}
impl fmt::Display for WatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WatchError::Io(e) => write!(f, "io: {e}"),
WatchError::NotifyBackend(s) => write!(f, "notify backend: {s}"),
WatchError::PathNotFound(p) => write!(f, "path not found: {}", p.display()),
WatchError::WatchNotFound(p) => write!(f, "watch not found: {}", p.display()),
}
}
}
impl std::error::Error for WatchError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
WatchError::Io(e) => Some(e),
_ => None,
}
}
}
impl From<std::io::Error> for WatchError {
fn from(e: std::io::Error) -> Self {
WatchError::Io(e)
}
}
/// Convert a `notify::Error` into `WatchError`, preserving the path hint
/// for path-related errors where possible.
pub fn from_notify(err: notify::Error) -> WatchError {
use notify::ErrorKind as NK;
let first_path = err.paths.first().cloned();
match err.kind {
NK::PathNotFound => {
WatchError::PathNotFound(first_path.unwrap_or_default())
}
NK::WatchNotFound => {
WatchError::WatchNotFound(first_path.unwrap_or_default())
}
NK::Io(ioe) => WatchError::Io(ioe),
NK::Generic(s) => WatchError::NotifyBackend(s),
NK::InvalidConfig(_) => {
WatchError::NotifyBackend("invalid config".into())
}
NK::MaxFilesWatch => {
WatchError::NotifyBackend("OS watch limit reached".into())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn display_has_prefix() {
let e = WatchError::PathNotFound(PathBuf::from("/nope"));
assert!(format!("{e}").starts_with("path not found"));
}
#[test]
fn notify_path_not_found_maps() {
let ne = notify::Error::path_not_found().add_path(PathBuf::from("/x"));
match from_notify(ne) {
WatchError::PathNotFound(p) => assert_eq!(p, PathBuf::from("/x")),
other => panic!("expected PathNotFound, got {other:?}"),
}
}
}

View file

@ -0,0 +1,85 @@
//! Canonical event type emitted by [`crate::Watcher`].
//!
//! Decoupled from `notify::Event` so downstream consumers don't bind to
//! notify's evolving hierarchy. Only four kinds are emitted:
//! Created / Modified / Deleted / Renamed.
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
/// Coarse event classification. All notify sub-kinds fold into these four.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum EventKind {
Created,
Modified,
Deleted,
Renamed,
}
impl EventKind {
/// Short lowercase tag (matches CLI JSON `kind` field).
pub fn as_str(&self) -> &'static str {
match self {
EventKind::Created => "Created",
EventKind::Modified => "Modified",
EventKind::Deleted => "Deleted",
EventKind::Renamed => "Renamed",
}
}
}
/// Filesystem event emitted by the watcher.
///
/// `from_path` is `Some(..)` only for `Renamed` events where both endpoints
/// are known at emission time (backend-dependent — see
/// `map::from_notify` for the folding rules).
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Event {
pub kind: EventKind,
pub path: PathBuf,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub from_path: Option<PathBuf>,
/// Unix seconds since epoch.
pub timestamp: i64,
}
impl Event {
/// Construct a new event; timestamp is captured here.
pub fn new(kind: EventKind, path: PathBuf, from_path: Option<PathBuf>) -> Self {
Self {
kind,
path,
from_path,
timestamp: unix_now(),
}
}
}
fn unix_now() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kind_as_str_is_stable() {
assert_eq!(EventKind::Created.as_str(), "Created");
assert_eq!(EventKind::Modified.as_str(), "Modified");
assert_eq!(EventKind::Deleted.as_str(), "Deleted");
assert_eq!(EventKind::Renamed.as_str(), "Renamed");
}
#[test]
fn event_constructs_with_timestamp() {
let ev = Event::new(EventKind::Created, PathBuf::from("/tmp/x"), None);
assert!(ev.timestamp > 0);
assert_eq!(ev.kind, EventKind::Created);
assert!(ev.from_path.is_none());
}
}

View file

@ -0,0 +1,50 @@
//! kei-watch — filesystem watcher primitive.
//!
//! Thin, synchronous wrapper around the [`notify`] crate. Emits a stable
//! canonical event format so downstream consumers (kei-pipe hot-reload,
//! kei-replay drift detection, dev-loop cache invalidation) don't bind
//! to notify's evolving [`notify::EventKind`] hierarchy.
//!
//! # Surface
//!
//! | Type | Role |
//! |------|------|
//! | [`Watcher`] | owns notify backend + pump thread |
//! | [`Event`] | canonical event ({kind, path, from_path, timestamp}) |
//! | [`EventKind`] | `Created` / `Modified` / `Deleted` / `Renamed` |
//! | [`WatchError`] | failure modes (Io / NotifyBackend / PathNotFound / WatchNotFound) |
//!
//! # Example
//! ```no_run
//! use kei_watch::{Watcher, EventKind};
//! use std::{path::Path, time::Duration};
//!
//! let mut w = Watcher::new().unwrap();
//! w.watch(Path::new("."), true).unwrap();
//! while let Some(ev) = w.next_event(Duration::from_secs(1)) {
//! if ev.kind == EventKind::Modified {
//! println!("{}", ev.path.display());
//! }
//! }
//! ```
//!
//! # Platform notes
//!
//! Rename semantics differ by backend. On macOS/Windows,
//! `RenameMode::Both` is emitted with both endpoints and we populate
//! `from_path`. On Linux (inotify), rename fires as two events
//! (`RenameMode::From` then `RenameMode::To`) correlated by tracker;
//! kei-watch emits each as a separate `Renamed` with `from_path=None`.
//! Downstream code that needs strict from→to pairing should fall back
//! to notify-debouncer-full.
pub mod debounce;
pub mod error;
pub mod event;
pub mod map;
pub mod pump;
pub mod watcher;
pub use error::WatchError;
pub use event::{Event, EventKind};
pub use watcher::Watcher;

View file

@ -0,0 +1,136 @@
//! kei-watch CLI — streams canonical FS events as JSON Lines.
//!
//! Usage:
//! ```text
//! kei-watch watch --path <DIR> [--recursive] [--timeout-ms <N>]
//! ```
//!
//! Each event is one JSON object per line, flushed per event:
//! `{"kind":"Modified","path":"/abs/path","from":null,"ts":1712345678}`.
//!
//! Exits after `--timeout-ms` of no activity. Without the flag, runs
//! until killed (Ctrl-C).
use clap::{Parser, Subcommand};
use kei_watch::{Event, Watcher};
use std::io::Write;
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
#[derive(Parser)]
#[command(name = "kei-watch", version, about = "Filesystem watcher primitive")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Watch a path and emit JSON-line events to stdout.
Watch {
/// Path to watch (file or directory).
#[arg(long)]
path: PathBuf,
/// Recurse into subdirectories.
#[arg(long)]
recursive: bool,
/// Exit after this many ms without activity. Omit → run forever.
#[arg(long)]
timeout_ms: Option<u64>,
},
}
fn err(msg: &str) -> ExitCode {
eprintln!("kei-watch: {msg}");
ExitCode::from(1)
}
fn event_to_json_line(ev: &Event) -> String {
// Compact, stable shape — not using serde_json::to_string on Event
// because we want `from` (short) rather than `from_path` (long).
let from = match &ev.from_path {
Some(p) => serde_json::Value::String(p.to_string_lossy().into_owned()),
None => serde_json::Value::Null,
};
let obj = serde_json::json!({
"kind": ev.kind.as_str(),
"path": ev.path.to_string_lossy(),
"from": from,
"ts": ev.timestamp,
});
obj.to_string()
}
fn emit_event(ev: &Event) -> std::io::Result<()> {
let line = event_to_json_line(ev);
let stdout = std::io::stdout();
let mut lock = stdout.lock();
writeln!(lock, "{line}")?;
lock.flush()
}
fn run_watch(path: PathBuf, recursive: bool, timeout_ms: Option<u64>) -> ExitCode {
let mut watcher = match Watcher::new() {
Ok(w) => w,
Err(e) => return err(&format!("new: {e}")),
};
if let Err(e) = watcher.watch(&path, recursive) {
return err(&format!("watch {}: {e}", path.display()));
}
let step = Duration::from_millis(500);
let limit = timeout_ms.unwrap_or(u64::MAX);
let mut idle_ms: u64 = 0;
loop {
match watcher.next_event(step) {
Some(ev) => {
if emit_event(&ev).is_err() {
return ExitCode::SUCCESS;
}
idle_ms = 0;
}
None => {
idle_ms = idle_ms.saturating_add(step.as_millis() as u64);
if idle_ms >= limit {
return ExitCode::SUCCESS;
}
}
}
}
}
fn main() -> ExitCode {
let cli = Cli::parse();
match cli.cmd {
Cmd::Watch { path, recursive, timeout_ms } => run_watch(path, recursive, timeout_ms),
}
}
#[cfg(test)]
mod tests {
use super::*;
use kei_watch::EventKind;
use std::path::PathBuf;
#[test]
fn json_line_has_required_fields() {
let ev = Event::new(EventKind::Modified, PathBuf::from("/x"), None);
let line = event_to_json_line(&ev);
assert!(line.contains("\"kind\":\"Modified\""));
assert!(line.contains("\"path\":\"/x\""));
assert!(line.contains("\"from\":null"));
assert!(line.contains("\"ts\":"));
}
#[test]
fn json_line_includes_from_when_renamed() {
let ev = Event::new(
EventKind::Renamed,
PathBuf::from("/b"),
Some(PathBuf::from("/a")),
);
let line = event_to_json_line(&ev);
assert!(line.contains("\"from\":\"/a\""));
assert!(line.contains("\"path\":\"/b\""));
}
}

View file

@ -0,0 +1,123 @@
//! Mapping: `notify::Event` → zero or more canonical [`Event`].
//!
//! Folding rules:
//! - `Create(*)` → `EventKind::Created`
//! - `Modify(Data*)` / `Modify(Any)` / `Modify(Other)` → `EventKind::Modified`
//! - `Remove(*)` → `EventKind::Deleted`
//! - `Modify(Name(*))` → `EventKind::Renamed` (from_path populated if both
//! endpoints present in `paths`; else None)
//! - `Access(*)` / `Modify(Metadata(*))` / `Other` / `Any` → SKIP
//!
//! Rationale: Access events fire constantly on macOS fsevents and are
//! rarely what a hot-reload / drift-detection consumer wants. Metadata
//! changes (mtime-only touch) are likewise noise.
use crate::event::{Event, EventKind};
use notify::event::{EventKind as NK, ModifyKind, RenameMode};
/// Convert one `notify::Event` into 0..N canonical [`Event`]s.
///
/// Returns `Vec` because a single notify event may carry multiple paths
/// (primarily for `Rename::Both`, which we still emit as a single event
/// with `from_path` populated — but we fold multi-path Create/Remove
/// sensibly too, one emitted event per path).
pub fn from_notify(ev: &notify::Event) -> Vec<Event> {
match ev.kind {
NK::Create(_) => fan_out(EventKind::Created, ev),
NK::Remove(_) => fan_out(EventKind::Deleted, ev),
NK::Modify(ModifyKind::Name(rm)) => rename(rm, ev),
NK::Modify(ModifyKind::Data(_))
| NK::Modify(ModifyKind::Any)
| NK::Modify(ModifyKind::Other) => fan_out(EventKind::Modified, ev),
// Skip: Access, Modify(Metadata(*)), Other, Any.
_ => Vec::new(),
}
}
/// Emit one canonical event per path in `ev.paths`.
fn fan_out(kind: EventKind, ev: &notify::Event) -> Vec<Event> {
ev.paths
.iter()
.map(|p| Event::new(kind, p.clone(), None))
.collect()
}
/// Rename mapping. `RenameMode::Both` carries `[from, to]` in paths;
/// other modes may carry only a single path (backend-dependent — see
/// crate-level docs). Callers receive partial information on those.
fn rename(mode: RenameMode, ev: &notify::Event) -> Vec<Event> {
match mode {
RenameMode::Both if ev.paths.len() >= 2 => {
let from = ev.paths[0].clone();
let to = ev.paths[1].clone();
vec![Event::new(EventKind::Renamed, to, Some(from))]
}
// RenameMode::To: path is the destination; no `from` known here.
// RenameMode::From: path is the origin; this event effectively
// says "this path moved away" — we surface it as Renamed with
// only `path` populated (no destination known yet).
// RenameMode::Any / Other: same — emit whatever path we have.
_ => ev
.paths
.iter()
.map(|p| Event::new(EventKind::Renamed, p.clone(), None))
.collect(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use notify::event::{AccessKind, CreateKind, MetadataKind, RemoveKind};
use std::path::PathBuf;
fn nev(kind: NK, paths: Vec<PathBuf>) -> notify::Event {
let mut e = notify::Event::new(kind);
e.paths = paths;
e
}
#[test]
fn create_maps_to_created() {
let e = nev(NK::Create(CreateKind::File), vec!["/a".into()]);
let out = from_notify(&e);
assert_eq!(out.len(), 1);
assert_eq!(out[0].kind, EventKind::Created);
}
#[test]
fn access_is_skipped() {
let e = nev(NK::Access(AccessKind::Read), vec!["/a".into()]);
assert!(from_notify(&e).is_empty());
}
#[test]
fn metadata_is_skipped() {
let e = nev(
NK::Modify(ModifyKind::Metadata(MetadataKind::AccessTime)),
vec!["/a".into()],
);
assert!(from_notify(&e).is_empty());
}
#[test]
fn rename_both_populates_from_path() {
let e = nev(
NK::Modify(ModifyKind::Name(RenameMode::Both)),
vec!["/a".into(), "/b".into()],
);
let out = from_notify(&e);
assert_eq!(out.len(), 1);
assert_eq!(out[0].kind, EventKind::Renamed);
assert_eq!(out[0].path, PathBuf::from("/b"));
assert_eq!(out[0].from_path, Some(PathBuf::from("/a")));
}
#[test]
fn remove_maps_to_deleted() {
let e = nev(NK::Remove(RemoveKind::File), vec!["/a".into()]);
let out = from_notify(&e);
assert_eq!(out.len(), 1);
assert_eq!(out[0].kind, EventKind::Deleted);
}
}

View file

@ -0,0 +1,72 @@
//! Single-threaded pump: reads `Result<notify::Event>` from notify's
//! channel, maps + debounces, pushes canonical [`Event`] to the output
//! channel consumed by `next_event` / `drain`.
//!
//! Exactly one thread is spawned per [`crate::Watcher`] instance. The
//! thread exits cleanly when notify's sender is dropped (closing the
//! input channel), which happens when the `notify::RecommendedWatcher`
//! is dropped inside [`crate::Watcher::drop`].
use crate::debounce::Debouncer;
use crate::event::Event;
use crate::map;
use std::sync::mpsc::{Receiver, Sender};
use std::thread::{self, JoinHandle};
/// Spawn the pump thread.
///
/// `notify_rx` — source end of notify's event channel.
/// `out_tx` — destination channel; each accepted canonical event is
/// forwarded here.
/// Returns the thread handle so the watcher can join on drop.
pub fn spawn(
notify_rx: Receiver<notify::Result<notify::Event>>,
out_tx: Sender<Event>,
) -> JoinHandle<()> {
thread::Builder::new()
.name("kei-watch-pump".into())
.spawn(move || run(notify_rx, out_tx))
.expect("spawn kei-watch pump thread")
}
fn run(notify_rx: Receiver<notify::Result<notify::Event>>, out_tx: Sender<Event>) {
let mut deb = Debouncer::new();
while let Ok(res) = notify_rx.recv() {
let Ok(notify_ev) = res else { continue };
for canon in map::from_notify(&notify_ev) {
if !deb.accept(&canon) {
continue;
}
// out_tx dropped (watcher released) → exit cleanly.
if out_tx.send(canon).is_err() {
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventKind;
use notify::event::{CreateKind, EventKind as NK};
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
#[test]
fn pump_forwards_canonical_event() {
let (n_tx, n_rx) = mpsc::channel::<notify::Result<notify::Event>>();
let (o_tx, o_rx) = mpsc::channel::<Event>();
let h = spawn(n_rx, o_tx);
let mut e = notify::Event::new(NK::Create(CreateKind::File));
e.paths = vec![PathBuf::from("/tmp/x")];
n_tx.send(Ok(e)).unwrap();
drop(n_tx);
let ev = o_rx.recv_timeout(Duration::from_millis(500)).unwrap();
assert_eq!(ev.kind, EventKind::Created);
h.join().unwrap();
}
}

View file

@ -0,0 +1,114 @@
//! Public [`Watcher`] type — owns the notify backend and the pump thread.
//!
//! Layout invariants:
//! - exactly one pump thread per watcher
//! - pump thread exits when `notify::Watcher` is dropped (closes
//! notify's sender, which closes the pump's `recv`)
//! - `Drop` explicitly drops the notify watcher first, then joins the
//! pump — preventing zombie threads
use crate::error::{from_notify, WatchError};
use crate::event::Event;
use crate::pump;
use notify::{RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use std::path::Path;
use std::sync::mpsc::{self, Receiver};
use std::thread::JoinHandle;
use std::time::Duration;
/// Filesystem watcher. Synchronous API only; see crate docs.
pub struct Watcher {
inner: Option<RecommendedWatcher>,
out_rx: Receiver<Event>,
pump_handle: Option<JoinHandle<()>>,
}
impl Watcher {
/// Construct a new watcher. Spawns one background thread for the
/// event pump and initialises the notify backend with its own
/// internal thread(s).
pub fn new() -> Result<Self, WatchError> {
let (n_tx, n_rx) = mpsc::channel::<notify::Result<notify::Event>>();
let (o_tx, o_rx) = mpsc::channel::<Event>();
let inner = notify::recommended_watcher(n_tx).map_err(from_notify)?;
let pump_handle = pump::spawn(n_rx, o_tx);
Ok(Self {
inner: Some(inner),
out_rx: o_rx,
pump_handle: Some(pump_handle),
})
}
/// Begin watching `path`. When `recursive=true`, all descendants
/// are watched too; otherwise only the path itself (and its
/// immediate children if a directory).
pub fn watch(&mut self, path: &Path, recursive: bool) -> Result<(), WatchError> {
let mode = if recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
let inner = self.inner.as_mut().expect("watcher initialised");
inner.watch(path, mode).map_err(from_notify)
}
/// Stop watching `path`.
pub fn unwatch(&mut self, path: &Path) -> Result<(), WatchError> {
let inner = self.inner.as_mut().expect("watcher initialised");
inner.unwatch(path).map_err(from_notify)
}
/// Block until either an event arrives or `timeout` elapses.
/// Returns `None` on timeout or channel closure.
pub fn next_event(&self, timeout: Duration) -> Option<Event> {
self.out_rx.recv_timeout(timeout).ok()
}
/// Non-blocking: drain all currently queued events.
pub fn drain(&self) -> Vec<Event> {
let mut out = Vec::new();
while let Ok(ev) = self.out_rx.try_recv() {
out.push(ev);
}
out
}
}
impl Drop for Watcher {
fn drop(&mut self) {
// Dropping the notify watcher closes the pump's input channel;
// the pump loop exits, and we can join its thread cleanly.
drop(self.inner.take());
if let Some(h) = self.pump_handle.take() {
let _ = h.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn new_and_drop_is_clean() {
let w = Watcher::new().unwrap();
drop(w);
}
#[test]
fn watch_missing_path_is_error() {
let mut w = Watcher::new().unwrap();
let r = w.watch(Path::new("/definitely/does/not/exist/kei-watch"), false);
assert!(r.is_err());
}
#[test]
fn drain_on_idle_is_empty() {
let d = tempdir().unwrap();
let mut w = Watcher::new().unwrap();
w.watch(d.path(), false).unwrap();
// No activity → drain returns empty.
assert!(w.drain().is_empty());
}
}

View file

@ -0,0 +1,54 @@
//! Shared helpers for the integration tests.
//!
//! `tests/` files are separate crates; common code lives under
//! `tests/common/mod.rs` per cargo convention (not a top-level
//! `tests/common.rs`, which would itself be a test binary).
use kei_watch::Watcher;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
pub const EVENT_TIMEOUT: Duration = Duration::from_secs(3);
/// Pull events until one matches `pred` or the global timeout elapses.
pub fn wait_for<F: Fn(&kei_watch::Event) -> bool>(
w: &Watcher,
pred: F,
) -> Option<kei_watch::Event> {
let deadline = Instant::now() + EVENT_TIMEOUT;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let step = std::cmp::min(remaining, Duration::from_millis(200));
if let Some(ev) = w.next_event(step) {
if pred(&ev) {
return Some(ev);
}
}
}
None
}
/// macOS reports paths under `/private/var/...`; tempdirs live at `/var/...`.
/// Canonicalise both sides before compare. When the target file has been
/// deleted, `canonicalize` fails — fall back to symlink-free parent match.
pub fn same_path(a: &Path, b: &Path) -> bool {
if a == b {
return true;
}
let ca = fs::canonicalize(a).ok();
let cb = fs::canonicalize(b).ok();
match (ca, cb) {
(Some(x), Some(y)) => x == y,
_ => canonicalize_parent(a) == canonicalize_parent(b),
}
}
fn canonicalize_parent(p: &Path) -> PathBuf {
let parent = p.parent().and_then(|q| fs::canonicalize(q).ok());
let file = p.file_name().map(|s| s.to_owned());
match (parent, file) {
(Some(par), Some(f)) => par.join(f),
_ => p.to_path_buf(),
}
}

View file

@ -0,0 +1,91 @@
//! Flow-control integration tests: rename semantics, `drain` behaviour,
//! `next_event` timeout, `unwatch`.
mod common;
use common::{same_path, wait_for};
use kei_watch::{EventKind, Watcher};
use std::fs;
use std::thread::sleep;
use std::time::{Duration, Instant};
use tempfile::tempdir;
#[test]
fn rename_file_emits_renamed() {
// Platform-flexible: macOS fsevents emits Modify(Name(Both)) with
// both paths; Linux inotify emits Modify(Name(From)) +
// Modify(Name(To)) as two events. The test accepts any Renamed
// referencing either endpoint.
let d = tempdir().expect("tempdir");
let from = d.path().join("src.txt");
let to = d.path().join("dst.txt");
fs::write(&from, b"x").expect("seed");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(150));
fs::rename(&from, &to).expect("rename");
let got = wait_for(&w, |e| {
e.kind == EventKind::Renamed
&& (same_path(&e.path, &from)
|| same_path(&e.path, &to)
|| e.from_path
.as_ref()
.is_some_and(|p| same_path(p, &from)))
});
assert!(got.is_some(), "expected any Renamed event referencing src/dst");
}
#[test]
fn drain_is_non_blocking() {
let d = tempdir().expect("tempdir");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
let start = Instant::now();
let out = w.drain();
assert!(start.elapsed() < Duration::from_millis(100));
assert!(out.is_empty());
}
#[test]
fn next_event_times_out_on_idle() {
let d = tempdir().expect("tempdir");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(100));
let _ = w.drain();
let start = Instant::now();
let ev = w.next_event(Duration::from_millis(200));
let elapsed = start.elapsed();
assert!(ev.is_none(), "expected None on idle, got {ev:?}");
assert!(
elapsed >= Duration::from_millis(150),
"next_event returned too fast: {elapsed:?}"
);
}
#[test]
fn unwatch_stops_events() {
let d = tempdir().expect("tempdir");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(100));
w.unwatch(d.path()).expect("unwatch");
sleep(Duration::from_millis(100));
let _ = w.drain();
fs::write(d.path().join("ghost.txt"), b"boo").unwrap();
sleep(Duration::from_millis(400));
let after: Vec<_> = w
.drain()
.into_iter()
.filter(|e| same_path(&e.path, &d.path().join("ghost.txt")))
.collect();
assert!(
after.is_empty(),
"expected zero events after unwatch, got {after:?}"
);
}

View file

@ -0,0 +1,116 @@
//! Smoke-level integration tests: construction, error paths,
//! create / modify / delete events, debounce behaviour.
//!
//! Rename + flow-control tests live in `tests/watch_flow.rs`.
mod common;
use common::{same_path, wait_for};
use kei_watch::{EventKind, Watcher};
use std::fs;
use std::io::Write;
use std::path::Path;
use std::thread::sleep;
use std::time::{Duration, Instant};
use tempfile::tempdir;
#[test]
fn new_and_drop_does_not_panic() {
let w = Watcher::new().expect("new");
drop(w);
}
#[test]
fn watch_nonexistent_path_returns_error() {
let mut w = Watcher::new().expect("new");
let bogus = Path::new("/definitely/does/not/exist/kei-watch-test-xxx");
assert!(w.watch(bogus, false).is_err());
}
#[test]
fn create_file_emits_created() {
let d = tempdir().expect("tempdir");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(100));
let f = d.path().join("new.txt");
fs::write(&f, b"hello").expect("write");
let got = wait_for(&w, |e| {
e.kind == EventKind::Created && same_path(&e.path, &f)
});
assert!(got.is_some(), "expected Created for {}", f.display());
}
#[test]
fn modify_file_emits_modified() {
let d = tempdir().expect("tempdir");
let f = d.path().join("m.txt");
fs::write(&f, b"v1").expect("seed");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(150));
let mut fh = fs::OpenOptions::new().append(true).open(&f).unwrap();
fh.write_all(b"v2").unwrap();
fh.flush().unwrap();
drop(fh);
let got = wait_for(&w, |e| {
e.kind == EventKind::Modified && same_path(&e.path, &f)
});
assert!(got.is_some(), "expected Modified for {}", f.display());
}
#[test]
fn delete_file_emits_deleted() {
let d = tempdir().expect("tempdir");
let f = d.path().join("del.txt");
fs::write(&f, b"doomed").expect("seed");
// Let the seed-create event flush before the watcher starts.
sleep(Duration::from_millis(100));
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(200));
let _ = w.drain();
fs::remove_file(&f).expect("remove");
let got = wait_for(&w, |e| {
e.kind == EventKind::Deleted && same_path(&e.path, &f)
});
assert!(got.is_some(), "expected Deleted for {}", f.display());
}
#[test]
fn rapid_modifies_are_debounced() {
let d = tempdir().expect("tempdir");
let f = d.path().join("burst.txt");
fs::write(&f, b"seed").expect("seed");
let mut w = Watcher::new().expect("new");
w.watch(d.path(), true).expect("watch");
sleep(Duration::from_millis(150));
let start = Instant::now();
for i in 0..5 {
fs::write(&f, format!("v{i}")).unwrap();
}
assert!(start.elapsed() < Duration::from_millis(50));
sleep(Duration::from_millis(300));
let drained: Vec<_> = w
.drain()
.into_iter()
.filter(|e| e.kind == EventKind::Modified && same_path(&e.path, &f))
.collect();
assert!(
drained.len() <= 2,
"expected ≤2 Modified events after debounce, got {}: {:?}",
drained.len(),
drained
);
}