fix(kei-cortex/test): serial_test on env-mutating openai tests + wiremock warm-up
Previous wiremock conversion fixed the listener-lifecycle race but left the underlying problem unsolved: `ensure_env()` mutates the process-global ANTHROPIC_ENDPOINT, and parallel `cargo test` threads race on that write. Manifested as 502 / "error sending request for url …" on the first concurrent test pair under both macOS and Linux. Annotate every #[tokio::test] in openai_loop_wiring.rs + openai_compat.rs with `#[serial_test::serial]` — these are the only tests that touch ANTHROPIC_ENDPOINT via shared_mock_anthropic. serial_test enforces process-wide ordering so the env mutation + HTTP request pair is atomic per test. All other tests stay parallel. Stress: 5 parallel `cargo test` runs all green.
This commit is contained in:
parent
1b8b2197fb
commit
95ccf56988
5 changed files with 97 additions and 10 deletions
42
_primitives/_rust/Cargo.lock
generated
42
_primitives/_rust/Cargo.lock
generated
|
|
@ -3412,6 +3412,7 @@ dependencies = [
|
|||
"rusqlite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"shell-words",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
|
|
@ -6241,6 +6242,15 @@ dependencies = [
|
|||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scc"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
|
||||
dependencies = [
|
||||
"sdd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.29"
|
||||
|
|
@ -6266,6 +6276,12 @@ dependencies = [
|
|||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sdd"
|
||||
version = "3.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
|
||||
|
||||
[[package]]
|
||||
name = "secrecy"
|
||||
version = "0.8.0"
|
||||
|
|
@ -6555,6 +6571,32 @@ dependencies = [
|
|||
"serial-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
|
||||
dependencies = [
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.5",
|
||||
"scc",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
|
|
|
|||
|
|
@ -67,3 +67,4 @@ kei-token-tracker = { path = "../kei-token-tracker" }
|
|||
[dev-dependencies]
|
||||
reqwest = { workspace = true, features = ["blocking"] }
|
||||
wiremock = { workspace = true }
|
||||
serial_test = "3"
|
||||
|
|
|
|||
|
|
@ -153,6 +153,12 @@ impl MockAnthropicServer {
|
|||
/// Spin up a wiremock server mounted with a canned `/v1/messages`
|
||||
/// reply. Bind happens on `127.0.0.1:0` via wiremock's own listener,
|
||||
/// which is reliable across macOS / Linux GitHub runners.
|
||||
///
|
||||
/// Includes a warm-up GET against the bound port so we only return
|
||||
/// once the listener is actually accepting connections — this closes
|
||||
/// the race where the first test under parallel `cargo test`
|
||||
/// dispatches an HTTP request to the mock before its acceptor loop
|
||||
/// is ready (manifests as `error sending request for url …`).
|
||||
async fn build_mock(text: &str) -> MockAnthropicServer {
|
||||
use wiremock::matchers::{method, path};
|
||||
use wiremock::{Mock, MockServer, ResponseTemplate};
|
||||
|
|
@ -167,31 +173,54 @@ async fn build_mock(text: &str) -> MockAnthropicServer {
|
|||
.respond_with(ResponseTemplate::new(200).set_body_json(body))
|
||||
.mount(&server)
|
||||
.await;
|
||||
// Warm-up probe: a HEAD against an unmounted route returns 404 but
|
||||
// confirms the listener is accepting. Retry briefly under cold CI.
|
||||
let probe_url = server.uri();
|
||||
for attempt in 0..20 {
|
||||
if reqwest::Client::new()
|
||||
.head(&probe_url)
|
||||
.timeout(std::time::Duration::from_millis(200))
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
if attempt == 19 {
|
||||
panic!("wiremock listener did not respond to warm-up probe after 1s");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
let uri = format!("{}/v1/messages", server.uri());
|
||||
MockAnthropicServer { server, uri }
|
||||
}
|
||||
|
||||
/// Per-call mock variant. Spawns a fresh wiremock instance with the
|
||||
/// given canned reply text. Each instance keeps its server alive for
|
||||
/// the lifetime of the returned handle (drop = stop).
|
||||
/// the lifetime of the returned handle.
|
||||
///
|
||||
/// Implementation note: wiremock's hyper server is tied to the runtime
|
||||
/// that called `MockServer::start().await`. We build a dedicated
|
||||
/// **multi-thread** runtime on a helper OS-thread and keep it alive by
|
||||
/// blocking on `pending()` — this guarantees the runtime keeps driving
|
||||
/// async tasks (the previous `std::thread::park()` froze the worker
|
||||
/// thread, starving wiremock of CPU and producing
|
||||
/// `error sending request for url …` on CI under parallel tests).
|
||||
pub fn mock_anthropic_responding_with(text: &'static str) -> MockAnthropicServer {
|
||||
// The caller is inside a `#[tokio::test]` runtime; build on it via
|
||||
// a one-shot thread + current-thread runtime to avoid nested-runtime
|
||||
// panics on tests that already hold a multi-thread runtime.
|
||||
let (tx, rx) = std::sync::mpsc::channel::<MockAnthropicServer>();
|
||||
let owned = text.to_string();
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.worker_threads(2)
|
||||
.build()
|
||||
.expect("mock-runtime build");
|
||||
let mock = rt.block_on(async move { build_mock(&owned).await });
|
||||
tx.send(mock).expect("send mock back");
|
||||
// Keep this runtime alive — wiremock's internal hyper server is
|
||||
// tied to it. We park the thread; `MockAnthropicServer` is now
|
||||
// owned by the caller and will Drop normally when test scope
|
||||
// ends. The runtime drops with the thread.
|
||||
std::thread::park();
|
||||
// Hold the runtime open for the rest of the process — wiremock
|
||||
// needs it to accept incoming requests. `pending()` yields
|
||||
// forever; worker threads keep processing the hyper server.
|
||||
rt.block_on(std::future::pending::<()>());
|
||||
});
|
||||
rx.recv().expect("mock channel closed")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
//! is picked up without any source-side wiring.
|
||||
|
||||
mod common;
|
||||
use serial_test::serial;
|
||||
|
||||
use axum::body::{to_bytes, Body};
|
||||
use axum::http::{Request, StatusCode};
|
||||
|
|
@ -53,6 +54,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request<Body> {
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn list_models_returns_kei_cortex() {
|
||||
ensure_api_key();
|
||||
|
|
@ -68,6 +70,7 @@ async fn list_models_returns_kei_cortex() {
|
|||
assert!(body["data"].as_array().unwrap().len() >= 1);
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn unauthorized_without_bearer() {
|
||||
// Use the same key value as `ensure_api_key` so this test does not
|
||||
|
|
@ -88,6 +91,7 @@ async fn unauthorized_without_bearer() {
|
|||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn chat_completions_sync_returns_choices() {
|
||||
ensure_api_key();
|
||||
|
|
@ -114,6 +118,7 @@ async fn chat_completions_sync_returns_choices() {
|
|||
assert!(v["choices"][0]["message"]["content"].is_string());
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn chat_completions_stream_emits_sse() {
|
||||
ensure_api_key();
|
||||
|
|
@ -140,6 +145,7 @@ async fn chat_completions_stream_emits_sse() {
|
|||
assert!(s.contains("data:"), "expected SSE data frame, got: {s}");
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn run_create_returns_202_and_id() {
|
||||
ensure_api_key();
|
||||
|
|
@ -177,6 +183,7 @@ async fn run_create_returns_202_and_id() {
|
|||
));
|
||||
}
|
||||
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn responses_get_unknown_id_returns_404() {
|
||||
ensure_api_key();
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
//! file stays focused (router shape there, loop wiring here).
|
||||
|
||||
mod common;
|
||||
use serial_test::serial;
|
||||
|
||||
use axum::body::{to_bytes, Body};
|
||||
use axum::http::{Request, StatusCode};
|
||||
|
|
@ -58,6 +59,7 @@ fn auth_request(method: &str, uri: &str, body: Body) -> Request<Body> {
|
|||
/// Sync /v1/chat/completions — response carries the mock's "hi" and NOT
|
||||
/// the legacy stub echo. Confirms `agent_runner::collect_reply` is the
|
||||
/// production code path.
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn sync_chat_completions_runs_real_loop_not_stub() {
|
||||
ensure_env();
|
||||
|
|
@ -94,6 +96,7 @@ async fn sync_chat_completions_runs_real_loop_not_stub() {
|
|||
|
||||
/// Streaming /v1/chat/completions — SSE body carries `delta` chunks fed
|
||||
/// by the real loop. No stub marker; finish frame present.
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn streaming_chat_completions_runs_real_loop_not_stub() {
|
||||
ensure_env();
|
||||
|
|
@ -144,6 +147,7 @@ async fn streaming_chat_completions_runs_real_loop_not_stub() {
|
|||
/// 4. The terminal `[DONE]` sentinel is the last `data:` line.
|
||||
/// 5. The finish-chunk index is strictly less than the sentinel index
|
||||
/// (proves ordering: finish, then sentinel — not bundled).
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() {
|
||||
ensure_env();
|
||||
|
|
@ -216,6 +220,7 @@ async fn streaming_chat_completions_emits_chunked_frames_not_single_blob() {
|
|||
/// /v1/responses — sync mode — response.output[0].text carries the
|
||||
/// mock reply, not the stub echo. Witnesses `responses::handle_sync`
|
||||
/// is wired through `agent_runner::collect_reply` (P1.1.c).
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn sync_responses_runs_real_loop_not_stub() {
|
||||
ensure_env();
|
||||
|
|
@ -250,6 +255,7 @@ async fn sync_responses_runs_real_loop_not_stub() {
|
|||
/// frames produced by the real loop and finishes with `response.completed`.
|
||||
/// Witnesses `responses::handle_stream` is wired through
|
||||
/// `agent_runner::stream_events` + `stream_forwarder::forward_responses`.
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn streaming_responses_runs_real_loop_not_stub() {
|
||||
ensure_env();
|
||||
|
|
@ -290,6 +296,7 @@ async fn streaming_responses_runs_real_loop_not_stub() {
|
|||
/// are wired through `agent_runner::stream_events` +
|
||||
/// `stream_forwarder::forward_runs` (P1.1.d). The previous `run_stub`
|
||||
/// would have leaked `[run stub] echo:` into the delta payload.
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn runs_events_stream_runs_real_loop_not_stub() {
|
||||
ensure_env();
|
||||
|
|
@ -354,6 +361,7 @@ async fn runs_events_stream_runs_real_loop_not_stub() {
|
|||
/// consumed, the registry slot's receiver is taken; a second GET must
|
||||
/// 404. Witnesses the `take_receiver` first-subscriber-wins contract is
|
||||
/// preserved by the real-loop wiring (it was true under the stub too).
|
||||
#[serial]
|
||||
#[tokio::test]
|
||||
async fn runs_events_second_subscriber_returns_not_found() {
|
||||
ensure_env();
|
||||
|
|
|
|||
Loading…
Reference in a new issue