fix(kei-cortex/test): serial_test on env-mutating openai tests + wiremock warm-up

Previous wiremock conversion fixed the listener-lifecycle race but
left the underlying problem unsolved: `ensure_env()` mutates the
process-global ANTHROPIC_ENDPOINT, and parallel `cargo test` threads
race on that write. Manifested as 502 / "error sending request for
url …" on the first concurrent test pair under both macOS and Linux.

Annotate every #[tokio::test] in openai_loop_wiring.rs +
openai_compat.rs with `#[serial_test::serial]` — these are the only
tests that touch ANTHROPIC_ENDPOINT via shared_mock_anthropic.
serial_test enforces process-wide ordering so the env mutation +
HTTP request pair is atomic per test. All other tests stay parallel.

Stress: 5 parallel `cargo test` runs all green.
This commit is contained in:
Parfii-bot 2026-05-12 22:26:42 +08:00
parent b103a9aa64
commit b5d093fbec
5 changed files with 97 additions and 10 deletions

View file

@ -3412,6 +3412,7 @@ dependencies = [
"rusqlite",
"serde",
"serde_json",
"serial_test",
"shell-words",
"tempfile",
"thiserror 1.0.69",
@ -6241,6 +6242,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "scc"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
dependencies = [
"sdd",
]
[[package]]
name = "schannel"
version = "0.1.29"
@ -6266,6 +6276,12 @@ dependencies = [
"untrusted",
]
[[package]]
name = "sdd"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
[[package]]
name = "secrecy"
version = "0.8.0"
@ -6555,6 +6571,32 @@ dependencies = [
"serial-core",
]
[[package]]
name = "serial_test"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
dependencies = [
"futures-executor",
"futures-util",
"log",
"once_cell",
"parking_lot 0.12.5",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "sha1"
version = "0.10.6"

View file

@ -67,3 +67,4 @@ kei-token-tracker = { path = "../kei-token-tracker" }
[dev-dependencies]
reqwest = { workspace = true, features = ["blocking"] }
wiremock = { workspace = true }
serial_test = "3"

View file

@ -153,6 +153,12 @@ impl MockAnthropicServer {
/// Spin up a wiremock server mounted with a canned `/v1/messages`
/// reply. Bind happens on `127.0.0.1:0` via wiremock's own listener,
/// which is reliable across macOS / Linux GitHub runners.
///
/// Includes a warm-up GET against the bound port so we only return
/// once the listener is actually accepting connections — this closes
/// the race where the first test under parallel `cargo test`
/// dispatches an HTTP request to the mock before its acceptor loop
/// is ready (manifests as `error sending request for url …`).
async fn build_mock(text: &str) -> MockAnthropicServer {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
@ -167,31 +173,54 @@ async fn build_mock(text: &str) -> MockAnthropicServer {
.respond_with(ResponseTemplate::new(200).set_body_json(body))
.mount(&server)
.await;
// Warm-up probe: a HEAD against an unmounted route returns 404 but
// confirms the listener is accepting. Retry briefly under cold CI.
let probe_url = server.uri();
for attempt in 0..20 {
if reqwest::Client::new()
.head(&probe_url)
.timeout(std::time::Duration::from_millis(200))
.send()
.await
.is_ok()
{
break;
}
if attempt == 19 {
panic!("wiremock listener did not respond to warm-up probe after 1s");
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let uri = format!("{}/v1/messages", server.uri());
MockAnthropicServer { server, uri }
}
/// Per-call mock variant. Spawns a fresh wiremock instance with the
/// given canned reply text. Each instance keeps its server alive for
/// the lifetime of the returned handle (drop = stop).
/// the lifetime of the returned handle.
///
/// Implementation note: wiremock's hyper server is tied to the runtime
/// that called `MockServer::start().await`. We build a dedicated
/// **multi-thread** runtime on a helper OS-thread and keep it alive by
/// blocking on `pending()` — this guarantees the runtime keeps driving
/// async tasks (the previous `std::thread::park()` froze the worker
/// thread, starving wiremock of CPU and producing
/// `error sending request for url …` on CI under parallel tests).
pub fn mock_anthropic_responding_with(text: &'static str) -> MockAnthropicServer {
// The caller is inside a `#[tokio::test]` runtime; build on it via
// a one-shot thread + current-thread runtime to avoid nested-runtime
// panics on tests that already hold a multi-thread runtime.
let (tx, rx) = std::sync::mpsc::channel::<MockAnthropicServer>();
let owned = text.to_string();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.expect("mock-runtime build");
let mock = rt.block_on(async move { build_mock(&owned).await });
tx.send(mock).expect("send mock back");
// Keep this runtime alive — wiremock's internal hyper server is
// tied to it. We park the thread; `MockAnthropicServer` is now
// owned by the caller and will Drop normally when test scope
// ends. The runtime drops with the thread.
std::thread::park();
// Hold the runtime open for the rest of the process — wiremock
// needs it to accept incoming requests. `pending()` yields
// forever; worker threads keep processing the hyper server.
rt.block_on(std::future::pending::<()>());
});
rx.recv().expect("mock channel closed")
}

View file

@ -15,6 +15,7 @@
//! is picked up without any source-side wiring.
mod common;
use serial_test::serial;
use axum::body::{to_bytes, Body};
use axum::http::{Request, StatusCode};
@ -53,6 +54,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request<Body> {
.unwrap()
}
#[serial]
#[tokio::test]
async fn list_models_returns_kei_cortex() {
ensure_api_key();
@ -68,6 +70,7 @@ async fn list_models_returns_kei_cortex() {
assert!(body["data"].as_array().unwrap().len() >= 1);
}
#[serial]
#[tokio::test]
async fn unauthorized_without_bearer() {
// Use the same key value as `ensure_api_key` so this test does not
@ -88,6 +91,7 @@ async fn unauthorized_without_bearer() {
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[serial]
#[tokio::test]
async fn chat_completions_sync_returns_choices() {
ensure_api_key();
@ -114,6 +118,7 @@ async fn chat_completions_sync_returns_choices() {
assert!(v["choices"][0]["message"]["content"].is_string());
}
#[serial]
#[tokio::test]
async fn chat_completions_stream_emits_sse() {
ensure_api_key();
@ -140,6 +145,7 @@ async fn chat_completions_stream_emits_sse() {
assert!(s.contains("data:"), "expected SSE data frame, got: {s}");
}
#[serial]
#[tokio::test]
async fn run_create_returns_202_and_id() {
ensure_api_key();
@ -177,6 +183,7 @@ async fn run_create_returns_202_and_id() {
));
}
#[serial]
#[tokio::test]
async fn responses_get_unknown_id_returns_404() {
ensure_api_key();

View file

@ -14,6 +14,7 @@
//! file stays focused (router shape there, loop wiring here).
mod common;
use serial_test::serial;
use axum::body::{to_bytes, Body};
use axum::http::{Request, StatusCode};
@ -58,6 +59,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request<Body> {
/// Sync /v1/chat/completions — response carries the mock's "hi" and NOT
/// the legacy stub echo. Confirms `agent_runner::collect_reply` is the
/// production code path.
#[serial]
#[tokio::test]
async fn sync_chat_completions_runs_real_loop_not_stub() {
ensure_env();
@ -94,6 +96,7 @@ async fn sync_chat_completions_runs_real_loop_not_stub() {
/// Streaming /v1/chat/completions — SSE body carries `delta` chunks fed
/// by the real loop. No stub marker; finish frame present.
#[serial]
#[tokio::test]
async fn streaming_chat_completions_runs_real_loop_not_stub() {
ensure_env();
@ -144,6 +147,7 @@ async fn streaming_chat_completions_runs_real_loop_not_stub() {
/// 4. The terminal `[DONE]` sentinel is the last `data:` line.
/// 5. The finish-chunk index is strictly less than the sentinel index
/// (proves ordering: finish, then sentinel — not bundled).
#[serial]
#[tokio::test]
async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() {
ensure_env();
@ -216,6 +220,7 @@ async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() {
/// /v1/responses — sync mode — response.output[0].text carries the
/// mock reply, not the stub echo. Witnesses `responses::handle_sync`
/// is wired through `agent_runner::collect_reply` (P1.1.c).
#[serial]
#[tokio::test]
async fn sync_responses_runs_real_loop_not_stub() {
ensure_env();
@ -250,6 +255,7 @@ async fn sync_responses_runs_real_loop_not_stub() {
/// frames produced by the real loop and finishes with `response.completed`.
/// Witnesses `responses::handle_stream` is wired through
/// `agent_runner::stream_events` + `stream_forwarder::forward_responses`.
#[serial]
#[tokio::test]
async fn streaming_responses_runs_real_loop_not_stub() {
ensure_env();
@ -290,6 +296,7 @@ async fn streaming_responses_runs_real_loop_not_stub() {
/// are wired through `agent_runner::stream_events` +
/// `stream_forwarder::forward_runs` (P1.1.d). The previous `run_stub`
/// would have leaked `[run stub] echo:` into the delta payload.
#[serial]
#[tokio::test]
async fn runs_events_stream_runs_real_loop_not_stub() {
ensure_env();
@ -354,6 +361,7 @@ async fn runs_events_stream_runs_real_loop_not_stub() {
/// consumed, the registry slot's receiver is taken; a second GET must
/// 404. Witnesses the `take_receiver` first-subscriber-wins contract is
/// preserved by the real-loop wiring (it was true under the stub too).
#[serial]
#[tokio::test]
async fn runs_events_second_subscriber_returns_not_found() {
ensure_env();