feat(stream-b): kei-task pilot — 3 atoms (create/search/add-dependency)

Pilot refactor per locked substrate schema. kei-task migrated to atom
layout:

- atoms/<verb>.md — YAML frontmatter + human body for 3 verbs
- atoms/schemas/<verb>-{input,output}.json — JSON Schema draft-07
- src/atoms/<verb>.rs — typed Input/Output/Error + pub fn run()
- src/atoms/mod.rs — module registry
- Cargo.toml [package.metadata.keisei] — crate-level substrate data
- src/main.rs — dispatcher for 3 pilot commands via atoms::

Zero behaviour change: 7/7 integration tests pass before and after
(create_and_get, update_persists, cycle_detected, milestone_linking,
dependency_chain_traversal, task_graph_edges, search_finds_task).

main.rs still has 5 non-migrated subcommands (update, graph,
dependency-chain, milestone, link-milestone) — scope discipline, they
migrate in later passes. main.rs 120 → 132 LOC.

Stream B pilot reference — other crates follow this pattern in v0.24+.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Parfii-bot 2026-04-23 00:09:55 +08:00
parent 9f6ba0cbfc
commit ae82bc6242
16 changed files with 669 additions and 12 deletions

View file

@ -5,6 +5,13 @@ edition = "2021"
rust-version = "1.75"
description = "Task DAG with deps + milestones (SQLite). Port of LBM internal/task."
[package.metadata.keisei]
backend = "sqlite"
db_env = "KEI_TASK_DB"
db_default = "~/.claude/task/task.sqlite"
migrations_dir = "migrations/"
schema_version = 1
[[bin]]
name = "kei-task"
path = "src/main.rs"

View file

@ -0,0 +1,76 @@
---
atom: kei-task::add-dependency
kind: command
version: "0.1.0"
input:
schema: schemas/add-dependency-input.json
required: [from, to]
example: { from: 42, to: 17, dep_type: "blocks" }
output:
schema: schemas/add-dependency-output.json
example: { ok: true }
errors:
- code: SelfDependency
http_analog: 400
description: "A task cannot depend on itself"
- code: InvalidDepType
http_analog: 400
description: "dep_type must be one of: blocks, feeds_into, subtask_of, milestone_of, assigned_to, depends_on"
- code: CycleDetected
http_analog: 409
description: "The new edge would close a cycle in the task DAG"
- code: StoreError
http_analog: 500
description: "Underlying SQLite store failed to insert the dependency row"
side_effects:
- { op: write, domain: kei-task-db }
idempotent: false
timeout_ms: 5000
deprecated: null
stability: stable
keywords: [task, dependency, dag, blocks, graph]
related:
- "[[kei-task::create]]"
---
# kei-task::add-dependency
Inserts a typed edge `from -> to` in the task DAG, rejecting cycles
and self-loops at write time. Edge is stored idempotently via
`INSERT OR IGNORE` — re-adding the same triple is a no-op.
## Example
kei-task add-dependency 42 17 --dep-type blocks
Stdout:
dep: 42 -> 17 (blocks)
Programmatic callers receive:
{ "ok": true }
## Gotchas
- `dep_type` defaults to `"blocks"`. Empty string is also treated as
`"blocks"` for CLI convenience.
- Cycle check is transitive — the implementation walks the existing
DAG from `to` and refuses if it can reach `from`.
- Both task ids must already exist; missing ids do NOT surface a
dedicated error code in the current impl and bubble up as
`StoreError` via foreign-key violation.
- Re-adding an existing edge is silently idempotent (`INSERT OR
IGNORE`) even though the atom declares `idempotent: false` — that
flag reflects the CONTRACT (callers should not rely on retry
semantics), not the specific SQL behaviour.
## Related
- [[kei-task::create]] — create endpoints of the edge first

View file

@ -0,0 +1,73 @@
---
atom: kei-task::create
kind: command
version: "0.1.0"
input:
schema: schemas/create-input.json
required: [title]
example: { title: "Fix auth bug", priority: "high" }
output:
schema: schemas/create-output.json
example: { id: 42, created_at: 1714000000 }
errors:
- code: InvalidPriority
http_analog: 400
description: "Priority must be one of: critical, high, medium, low"
- code: InvalidTitle
http_analog: 400
description: "Title must be non-empty"
- code: StoreError
http_analog: 500
description: "Underlying SQLite store failed to insert the task"
side_effects:
- { op: write, domain: kei-task-db }
idempotent: false
timeout_ms: 5000
deprecated: null
stability: stable
keywords: [task, todo, create, dag, planning]
related:
- "[[kei-task::add-dependency]]"
- "[[kei-task::search]]"
---
# kei-task::create
Creates a new task row in the kei-task SQLite DAG. Returns the inserted
row id and the `created_at` unix timestamp. Also indexes title +
description into the FTS table used by `kei-task::search`.
## Example
kei-task create "Fix auth bug" --priority high --description "Token rotation fails on leap second"
Returns the new task id on stdout:
42
Programmatic callers (runtime invocation) receive:
{ "id": 42, "created_at": 1714000000 }
## Gotchas
- `priority` defaults to `"medium"` if omitted. Case sensitive —
`High` returns `InvalidPriority`.
- `description` defaults to empty string; blank descriptions are still
indexed by FTS but yield no search hits until populated.
- `milestone_id` in the input schema is reserved for future use; the
current CLI does NOT accept it — link via `kei-task link-milestone`
after creation.
- Title uniqueness is NOT enforced at DB level; duplicate titles are
allowed and will all be returned by `kei-task::search`.
## Related
- [[kei-task::add-dependency]] — wire the new task into the DAG
- [[kei-task::search]] — look up by title / description

View file

@ -0,0 +1,36 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/add-dependency-input.json",
"title": "kei-task::add-dependency input",
"type": "object",
"required": ["from", "to"],
"properties": {
"from": {
"type": "integer",
"minimum": 1,
"description": "Source task id (the task that has the dependency)"
},
"to": {
"type": "integer",
"minimum": 1,
"description": "Target task id (the task being depended on)"
},
"dep_type": {
"type": "string",
"enum": [
"blocks",
"feeds_into",
"subtask_of",
"milestone_of",
"assigned_to",
"depends_on"
],
"default": "blocks"
}
},
"additionalProperties": false,
"examples": [
{ "from": 42, "to": 17, "dep_type": "blocks" },
{ "from": 3, "to": 1 }
]
}

View file

@ -0,0 +1,17 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/add-dependency-output.json",
"title": "kei-task::add-dependency output",
"type": "object",
"required": ["ok"],
"properties": {
"ok": {
"type": "boolean",
"description": "True when the edge was written (or already present — idempotent via INSERT OR IGNORE)"
}
},
"additionalProperties": false,
"examples": [
{ "ok": true }
]
}

View file

@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/create-input.json",
"title": "kei-task::create input",
"type": "object",
"required": ["title"],
"properties": {
"title": { "type": "string", "minLength": 1, "maxLength": 500 },
"description": { "type": "string", "maxLength": 8000, "default": "" },
"priority": {
"type": "string",
"enum": ["critical", "high", "medium", "low"],
"default": "medium"
},
"milestone_id": { "type": "integer", "minimum": 1 }
},
"additionalProperties": false,
"examples": [
{ "title": "Fix auth bug", "priority": "high" },
{ "title": "Refactor router", "description": "Split monolith", "priority": "medium" }
]
}

View file

@ -0,0 +1,18 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/create-output.json",
"title": "kei-task::create output",
"type": "object",
"required": ["id", "created_at"],
"properties": {
"id": { "type": "integer", "minimum": 1 },
"created_at": {
"type": "integer",
"description": "Unix timestamp (seconds since epoch) when the row was inserted"
}
},
"additionalProperties": false,
"examples": [
{ "id": 42, "created_at": 1714000000 }
]
}

View file

@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/search-input.json",
"title": "kei-task::search input",
"type": "object",
"required": ["query"],
"properties": {
"query": {
"type": "string",
"minLength": 1,
"description": "FTS5 query syntax over task title + description"
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 10000,
"default": 20
}
},
"additionalProperties": false,
"examples": [
{ "query": "refactor", "limit": 20 },
{ "query": "auth OR token" }
]
}

View file

@ -0,0 +1,52 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "kei-task/atoms/schemas/search-output.json",
"title": "kei-task::search output",
"type": "object",
"required": ["results"],
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"required": ["id", "title", "status", "priority"],
"properties": {
"id": { "type": "integer", "minimum": 1 },
"title": { "type": "string" },
"description": { "type": "string" },
"status": { "type": "string" },
"priority": { "type": "string" },
"task_type": { "type": "string" },
"parent_id": { "type": "integer" },
"assigned_to": { "type": "string" },
"due_date": { "type": "integer" },
"completed_at": { "type": "integer" },
"created_at": { "type": "integer" },
"updated_at": { "type": "integer" }
},
"additionalProperties": false
}
}
},
"additionalProperties": false,
"examples": [
{
"results": [
{
"id": 42,
"title": "refactor router",
"description": "split monolith",
"status": "pending",
"priority": "high",
"task_type": "",
"parent_id": 0,
"assigned_to": "",
"due_date": 0,
"completed_at": 0,
"created_at": 1714000000,
"updated_at": 1714000000
}
]
}
]
}

View file

@ -0,0 +1,74 @@
---
atom: kei-task::search
kind: query
version: "0.1.0"
input:
schema: schemas/search-input.json
required: [query]
example: { query: "refactor", limit: 20 }
output:
schema: schemas/search-output.json
example:
results:
- { id: 42, title: "refactor router", status: "pending", priority: "high" }
errors:
- code: InvalidQuery
http_analog: 400
description: "FTS5 rejected the query syntax"
- code: StoreError
http_analog: 500
description: "Underlying SQLite store failed during read"
side_effects: []
idempotent: true
timeout_ms: 5000
deprecated: null
stability: stable
keywords: [task, search, fts, lookup, query]
related:
- "[[kei-task::create]]"
- "[[kei-task::add-dependency]]"
---
# kei-task::search
Runs a FTS5 full-text search over task titles + descriptions and
returns matches ordered by `rank` (FTS5 BM25 relevance).
## Example
kei-task search "refactor" --limit 10
Tab-separated on stdout, one row per hit:
42 pending refactor router
57 in_progress refactor auth layer
Programmatic callers receive a typed array:
{
"results": [
{ "id": 42, "title": "refactor router", "status": "pending", ... }
]
}
## Gotchas
- `limit` defaults to 20 and is clamped to a positive integer — pass
`0` or negative and the implementation silently uses 20.
- Query uses FTS5 syntax — phrase search needs double quotes inside
the query string (shell escape required).
- Returned rows always include the full `Task` shape; callers that
only need `id` should project client-side.
- Results are ordered by FTS rank, NOT by `created_at` — recent tasks
may be returned in the middle of the result set.
## Related
- [[kei-task::create]] — tasks only appear here once created
- [[kei-task::add-dependency]] — traverse from a search hit into the DAG

View file

@ -0,0 +1,77 @@
//! kei-task::add-dependency atom — see atoms/add-dependency.md for contract.
use crate::deps::add_dependency as add_dep_impl;
use crate::store::Store;
use crate::types::{is_valid_dep, VALID_DEP_TYPES};
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Input {
pub from: i64,
pub to: i64,
#[serde(default)]
pub dep_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Output {
pub ok: bool,
}
#[derive(Debug)]
pub enum Error {
SelfDependency,
InvalidDepType(String),
CycleDetected,
StoreError(anyhow::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::SelfDependency => write!(f, "SelfDependency: task cannot depend on itself"),
Error::InvalidDepType(t) => write!(
f, "InvalidDepType: {t} (allowed: {})", VALID_DEP_TYPES.join(", ")
),
Error::CycleDetected => write!(f, "CycleDetected: edge would close a cycle"),
Error::StoreError(e) => write!(f, "StoreError: {e:#}"),
}
}
}
impl std::error::Error for Error {}
pub fn run(store: &Store, input: Input) -> Result<Output, Error> {
validate(&input)?;
let dep = normalize_dep(&input.dep_type);
add_dep_impl(store, input.from, input.to, &dep).map_err(classify_error)?;
Ok(Output { ok: true })
}
fn validate(input: &Input) -> Result<(), Error> {
if input.from == input.to {
return Err(Error::SelfDependency);
}
if !input.dep_type.is_empty() && !is_valid_dep(&input.dep_type) {
return Err(Error::InvalidDepType(input.dep_type.clone()));
}
Ok(())
}
fn normalize_dep(raw: &str) -> String {
if raw.is_empty() { "blocks".into() } else { raw.to_string() }
}
fn classify_error(e: anyhow::Error) -> Error {
let msg = format!("{e:#}");
if msg.contains("cycle") {
Error::CycleDetected
} else if msg.contains("self-dependency") {
Error::SelfDependency
} else if msg.contains("invalid dep type") {
Error::InvalidDepType(msg)
} else {
Error::StoreError(e)
}
}

View file

@ -0,0 +1,78 @@
//! kei-task::create atom — see atoms/create.md for contract.
use crate::store::Store;
use crate::types::{is_valid_priority, Task};
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Input {
pub title: String,
#[serde(default)]
pub description: String,
#[serde(default)]
pub priority: String,
#[serde(default)]
pub milestone_id: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Output {
pub id: i64,
pub created_at: i64,
}
#[derive(Debug)]
pub enum Error {
InvalidTitle,
InvalidPriority(String),
StoreError(anyhow::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::InvalidTitle => write!(f, "InvalidTitle: title must be non-empty"),
Error::InvalidPriority(p) => write!(f, "InvalidPriority: {p}"),
Error::StoreError(e) => write!(f, "StoreError: {e:#}"),
}
}
}
impl std::error::Error for Error {}
pub fn run(store: &Store, input: Input) -> Result<Output, Error> {
validate(&input)?;
let priority = normalize_priority(&input.priority);
let task = Task {
title: input.title,
description: input.description,
priority,
..Default::default()
};
let id = store.create_task(&task).map_err(Error::StoreError)?;
let created = read_created_at(store, id)?;
Ok(Output { id, created_at: created })
}
fn validate(input: &Input) -> Result<(), Error> {
if input.title.trim().is_empty() {
return Err(Error::InvalidTitle);
}
if !input.priority.is_empty() && !is_valid_priority(&input.priority) {
return Err(Error::InvalidPriority(input.priority.clone()));
}
Ok(())
}
fn normalize_priority(raw: &str) -> String {
if raw.is_empty() { "medium".into() } else { raw.to_string() }
}
fn read_created_at(store: &Store, id: i64) -> Result<i64, Error> {
let t = store
.get_task(id)
.map_err(Error::StoreError)?
.ok_or_else(|| Error::StoreError(anyhow::anyhow!("task {id} not found after insert")))?;
Ok(t.created_at)
}

View file

@ -0,0 +1,10 @@
//! kei-task atoms — one file per verb, each exposing
//! `pub fn run(store, input) -> Result<Output, Error>`.
//!
//! Reference implementation for the substrate schema (see
//! `docs/SUBSTRATE-SCHEMA.md`). Every other kei-* crate will follow
//! this shape in v0.24+.
pub mod add_dependency;
pub mod create;
pub mod search;

View file

@ -0,0 +1,79 @@
//! kei-task::search atom — see atoms/search.md for contract.
use crate::search as search_impl;
use crate::store::Store;
use crate::types::Task;
use serde::{Deserialize, Serialize};
use std::fmt;
const DEFAULT_LIMIT: i64 = 20;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Input {
pub query: String,
#[serde(default)]
pub limit: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchHit {
pub id: i64,
pub title: String,
pub description: String,
pub status: String,
pub priority: String,
pub task_type: String,
pub parent_id: i64,
pub assigned_to: String,
pub due_date: i64,
pub completed_at: i64,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Output {
pub results: Vec<SearchHit>,
}
#[derive(Debug)]
pub enum Error {
InvalidQuery,
StoreError(anyhow::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::InvalidQuery => write!(f, "InvalidQuery: query must be non-empty"),
Error::StoreError(e) => write!(f, "StoreError: {e:#}"),
}
}
}
impl std::error::Error for Error {}
pub fn run(store: &Store, input: Input) -> Result<Output, Error> {
if input.query.trim().is_empty() {
return Err(Error::InvalidQuery);
}
let limit = normalize_limit(input.limit);
let hits = search_impl::search(store, &input.query, limit).map_err(Error::StoreError)?;
Ok(Output { results: hits.into_iter().map(task_to_hit).collect() })
}
fn normalize_limit(raw: Option<i64>) -> i64 {
match raw {
Some(n) if n > 0 => n,
_ => DEFAULT_LIMIT,
}
}
fn task_to_hit(t: Task) -> SearchHit {
SearchHit {
id: t.id, title: t.title, description: t.description, status: t.status,
priority: t.priority, task_type: t.task_type, parent_id: t.parent_id,
assigned_to: t.assigned_to, due_date: t.due_date, completed_at: t.completed_at,
created_at: t.created_at, updated_at: t.updated_at,
}
}

View file

@ -1,5 +1,6 @@
//! kei-task — tasks with typed deps (DAG, cycle-detected), milestones, FTS search.
pub mod atoms;
pub mod deps;
pub mod graph;
pub mod milestones;

View file

@ -1,11 +1,15 @@
//! kei-task CLI — create / update / add-dep / graph / dependency-chain.
//!
//! Pilot refactor (Stream B): `create`, `search`, `add-dependency` now
//! dispatch through `kei_task::atoms::*`. Remaining subcommands call
//! legacy module functions directly — they migrate in a later pass.
use clap::{Parser, Subcommand};
use kei_task::deps::{add_dependency, dependency_chain};
use kei_task::atoms;
use kei_task::deps::dependency_chain;
use kei_task::graph::list_edges;
use kei_task::milestones::{create_milestone, link_task_to_milestone};
use kei_task::search::search;
use kei_task::{Milestone, Store, Task};
use kei_task::{Milestone, Store};
use std::path::PathBuf;
use std::process::ExitCode;
@ -49,10 +53,10 @@ fn dispatch(s: &Store, cmd: Cmd) -> anyhow::Result<()> {
cmd_create(s, title, description, priority),
Cmd::Update { id, status, title } => cmd_update(s, id, status, title),
Cmd::AddDependency { from_id, to_id, dep_type } =>
cmd_add_dep(s, from_id, to_id, &dep_type),
cmd_add_dep(s, from_id, to_id, dep_type),
Cmd::Graph => cmd_graph(s),
Cmd::DependencyChain { id } => cmd_chain(s, id),
Cmd::Search { query, limit } => cmd_search(s, &query, limit),
Cmd::Search { query, limit } => cmd_search(s, query, limit),
Cmd::Milestone { name, description } => cmd_milestone(s, name, description),
Cmd::LinkMilestone { task_id, milestone_id } =>
cmd_link_milestone(s, task_id, milestone_id),
@ -60,8 +64,10 @@ fn dispatch(s: &Store, cmd: Cmd) -> anyhow::Result<()> {
}
fn cmd_create(s: &Store, title: String, description: String, priority: String) -> anyhow::Result<()> {
let id = s.create_task(&Task { title, description, priority, ..Default::default() })?;
println!("{}", id);
let out = atoms::create::run(s, atoms::create::Input {
title, description, priority, milestone_id: None,
}).map_err(|e| anyhow::anyhow!("{e}"))?;
println!("{}", out.id);
Ok(())
}
@ -74,9 +80,12 @@ fn cmd_update(s: &Store, id: i64, status: Option<String>, title: Option<String>)
Ok(())
}
fn cmd_add_dep(s: &Store, from_id: i64, to_id: i64, dep_type: &str) -> anyhow::Result<()> {
add_dependency(s, from_id, to_id, dep_type)?;
println!("dep: {} -> {} ({})", from_id, to_id, dep_type);
fn cmd_add_dep(s: &Store, from_id: i64, to_id: i64, dep_type: String) -> anyhow::Result<()> {
let dep_display = if dep_type.is_empty() { "blocks".to_string() } else { dep_type.clone() };
atoms::add_dependency::run(s, atoms::add_dependency::Input {
from: from_id, to: to_id, dep_type,
}).map_err(|e| anyhow::anyhow!("{e}"))?;
println!("dep: {} -> {} ({})", from_id, to_id, dep_display);
Ok(())
}
@ -92,8 +101,11 @@ fn cmd_chain(s: &Store, id: i64) -> anyhow::Result<()> {
Ok(())
}
fn cmd_search(s: &Store, query: &str, limit: i64) -> anyhow::Result<()> {
for t in search(s, query, limit)? {
fn cmd_search(s: &Store, query: String, limit: i64) -> anyhow::Result<()> {
let out = atoms::search::run(s, atoms::search::Input {
query, limit: Some(limit),
}).map_err(|e| anyhow::anyhow!("{e}"))?;
for t in out.results {
println!("{}\t{}\t{}", t.id, t.status, t.title);
}
Ok(())