diff --git a/_primitives/_rust/Cargo.lock b/_primitives/_rust/Cargo.lock index ea460d8..8390eb3 100644 --- a/_primitives/_rust/Cargo.lock +++ b/_primitives/_rust/Cargo.lock @@ -3412,6 +3412,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "serial_test", "shell-words", "tempfile", "thiserror 1.0.69", @@ -6241,6 +6242,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scc" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.29" @@ -6266,6 +6276,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" + [[package]] name = "secrecy" version = "0.8.0" @@ -6555,6 +6571,32 @@ dependencies = [ "serial-core", ] +[[package]] +name = "serial_test" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f" +dependencies = [ + "futures-executor", + "futures-util", + "log", + "once_cell", + "parking_lot 0.12.5", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/_primitives/_rust/kei-cortex/Cargo.toml b/_primitives/_rust/kei-cortex/Cargo.toml index 9f76cec..7313a42 100644 --- a/_primitives/_rust/kei-cortex/Cargo.toml +++ b/_primitives/_rust/kei-cortex/Cargo.toml @@ -67,3 +67,4 @@ kei-token-tracker = { path = "../kei-token-tracker" } [dev-dependencies] reqwest = { workspace = true, features = ["blocking"] } wiremock = { workspace = true } +serial_test = "3" diff --git a/_primitives/_rust/kei-cortex/tests/common/mod.rs b/_primitives/_rust/kei-cortex/tests/common/mod.rs index 8c5eaa4..84acfa9 100644 --- a/_primitives/_rust/kei-cortex/tests/common/mod.rs +++ b/_primitives/_rust/kei-cortex/tests/common/mod.rs @@ -153,6 +153,12 @@ impl MockAnthropicServer { /// Spin up a wiremock server mounted with a canned `/v1/messages` /// reply. Bind happens on `127.0.0.1:0` via wiremock's own listener, /// which is reliable across macOS / Linux GitHub runners. +/// +/// Includes a warm-up GET against the bound port so we only return +/// once the listener is actually accepting connections — this closes +/// the race where the first test under parallel `cargo test` +/// dispatches an HTTP request to the mock before its acceptor loop +/// is ready (manifests as `error sending request for url …`). async fn build_mock(text: &str) -> MockAnthropicServer { use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -167,31 +173,54 @@ async fn build_mock(text: &str) -> MockAnthropicServer { .respond_with(ResponseTemplate::new(200).set_body_json(body)) .mount(&server) .await; + // Warm-up probe: a HEAD against an unmounted route returns 404 but + // confirms the listener is accepting. Retry briefly under cold CI. + let probe_url = server.uri(); + for attempt in 0..20 { + if reqwest::Client::new() + .head(&probe_url) + .timeout(std::time::Duration::from_millis(200)) + .send() + .await + .is_ok() + { + break; + } + if attempt == 19 { + panic!("wiremock listener did not respond to warm-up probe after 1s"); + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } let uri = format!("{}/v1/messages", server.uri()); MockAnthropicServer { server, uri } } /// Per-call mock variant. Spawns a fresh wiremock instance with the /// given canned reply text. Each instance keeps its server alive for -/// the lifetime of the returned handle (drop = stop). +/// the lifetime of the returned handle. +/// +/// Implementation note: wiremock's hyper server is tied to the runtime +/// that called `MockServer::start().await`. We build a dedicated +/// **multi-thread** runtime on a helper OS-thread and keep it alive by +/// blocking on `pending()` — this guarantees the runtime keeps driving +/// async tasks (the previous `std::thread::park()` froze the worker +/// thread, starving wiremock of CPU and producing +/// `error sending request for url …` on CI under parallel tests). pub fn mock_anthropic_responding_with(text: &'static str) -> MockAnthropicServer { - // The caller is inside a `#[tokio::test]` runtime; build on it via - // a one-shot thread + current-thread runtime to avoid nested-runtime - // panics on tests that already hold a multi-thread runtime. let (tx, rx) = std::sync::mpsc::channel::(); let owned = text.to_string(); std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() + .worker_threads(2) .build() .expect("mock-runtime build"); let mock = rt.block_on(async move { build_mock(&owned).await }); tx.send(mock).expect("send mock back"); - // Keep this runtime alive — wiremock's internal hyper server is - // tied to it. We park the thread; `MockAnthropicServer` is now - // owned by the caller and will Drop normally when test scope - // ends. The runtime drops with the thread. - std::thread::park(); + // Hold the runtime open for the rest of the process — wiremock + // needs it to accept incoming requests. `pending()` yields + // forever; worker threads keep processing the hyper server. + rt.block_on(std::future::pending::<()>()); }); rx.recv().expect("mock channel closed") } diff --git a/_primitives/_rust/kei-cortex/tests/openai_compat.rs b/_primitives/_rust/kei-cortex/tests/openai_compat.rs index 68b0f9a..0cd5bfb 100644 --- a/_primitives/_rust/kei-cortex/tests/openai_compat.rs +++ b/_primitives/_rust/kei-cortex/tests/openai_compat.rs @@ -15,6 +15,7 @@ //! is picked up without any source-side wiring. mod common; +use serial_test::serial; use axum::body::{to_bytes, Body}; use axum::http::{Request, StatusCode}; @@ -53,6 +54,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request { .unwrap() } +#[serial] #[tokio::test] async fn list_models_returns_kei_cortex() { ensure_api_key(); @@ -68,6 +70,7 @@ async fn list_models_returns_kei_cortex() { assert!(body["data"].as_array().unwrap().len() >= 1); } +#[serial] #[tokio::test] async fn unauthorized_without_bearer() { // Use the same key value as `ensure_api_key` so this test does not @@ -88,6 +91,7 @@ async fn unauthorized_without_bearer() { assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); } +#[serial] #[tokio::test] async fn chat_completions_sync_returns_choices() { ensure_api_key(); @@ -114,6 +118,7 @@ async fn chat_completions_sync_returns_choices() { assert!(v["choices"][0]["message"]["content"].is_string()); } +#[serial] #[tokio::test] async fn chat_completions_stream_emits_sse() { ensure_api_key(); @@ -140,6 +145,7 @@ async fn chat_completions_stream_emits_sse() { assert!(s.contains("data:"), "expected SSE data frame, got: {s}"); } +#[serial] #[tokio::test] async fn run_create_returns_202_and_id() { ensure_api_key(); @@ -177,6 +183,7 @@ async fn run_create_returns_202_and_id() { )); } +#[serial] #[tokio::test] async fn responses_get_unknown_id_returns_404() { ensure_api_key(); diff --git a/_primitives/_rust/kei-cortex/tests/openai_loop_wiring.rs b/_primitives/_rust/kei-cortex/tests/openai_loop_wiring.rs index 00323bb..ba24583 100644 --- a/_primitives/_rust/kei-cortex/tests/openai_loop_wiring.rs +++ b/_primitives/_rust/kei-cortex/tests/openai_loop_wiring.rs @@ -14,6 +14,7 @@ //! file stays focused (router shape there, loop wiring here). mod common; +use serial_test::serial; use axum::body::{to_bytes, Body}; use axum::http::{Request, StatusCode}; @@ -58,6 +59,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request { /// Sync /v1/chat/completions — response carries the mock's "hi" and NOT /// the legacy stub echo. Confirms `agent_runner::collect_reply` is the /// production code path. +#[serial] #[tokio::test] async fn sync_chat_completions_runs_real_loop_not_stub() { ensure_env(); @@ -94,6 +96,7 @@ async fn sync_chat_completions_runs_real_loop_not_stub() { /// Streaming /v1/chat/completions — SSE body carries `delta` chunks fed /// by the real loop. No stub marker; finish frame present. +#[serial] #[tokio::test] async fn streaming_chat_completions_runs_real_loop_not_stub() { ensure_env(); @@ -144,6 +147,7 @@ async fn streaming_chat_completions_runs_real_loop_not_stub() { /// 4. The terminal `[DONE]` sentinel is the last `data:` line. /// 5. The finish-chunk index is strictly less than the sentinel index /// (proves ordering: finish, then sentinel — not bundled). +#[serial] #[tokio::test] async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() { ensure_env(); @@ -216,6 +220,7 @@ async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() { /// /v1/responses — sync mode — response.output[0].text carries the /// mock reply, not the stub echo. Witnesses `responses::handle_sync` /// is wired through `agent_runner::collect_reply` (P1.1.c). +#[serial] #[tokio::test] async fn sync_responses_runs_real_loop_not_stub() { ensure_env(); @@ -250,6 +255,7 @@ async fn sync_responses_runs_real_loop_not_stub() { /// frames produced by the real loop and finishes with `response.completed`. /// Witnesses `responses::handle_stream` is wired through /// `agent_runner::stream_events` + `stream_forwarder::forward_responses`. +#[serial] #[tokio::test] async fn streaming_responses_runs_real_loop_not_stub() { ensure_env(); @@ -290,6 +296,7 @@ async fn streaming_responses_runs_real_loop_not_stub() { /// are wired through `agent_runner::stream_events` + /// `stream_forwarder::forward_runs` (P1.1.d). The previous `run_stub` /// would have leaked `[run stub] echo:` into the delta payload. +#[serial] #[tokio::test] async fn runs_events_stream_runs_real_loop_not_stub() { ensure_env(); @@ -354,6 +361,7 @@ async fn runs_events_stream_runs_real_loop_not_stub() { /// consumed, the registry slot's receiver is taken; a second GET must /// 404. Witnesses the `take_receiver` first-subscriber-wins contract is /// preserved by the real-loop wiring (it was true under the stub too). +#[serial] #[tokio::test] async fn runs_events_second_subscriber_returns_not_found() { ensure_env();