From c1a67a736065fe780f714244dc7eca4653ae14b5 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 25 Jun 2026 05:15:38 -0700 Subject: [PATCH] fix(sidecar): shadow-walk skip + bound undici pool to stop the net-bridge leak Three native-sidecar fixes behind long-lived VM latency/stall. 1) Read-side shadow-tree re-walk Read-side guest fs ops (Exists/Stat/Lstat/ReadFile/Pread) reconcile the host shadow tree into the kernel VFS, re-reading+re-writing EVERY file on EVERY op (O(whole tree), super-linear). Add an rsync-style (size,mode,mtime) lstat skip for unchanged files. Test read_side_ops_skip_unchanged_shadow_files: warm read over an unchanged 800-file tree >4x cheaper than cold (cold=22.7s, warm=28ms debug). 2) EventEmitter shim: spurious MaxListenersExceededWarning ensureEventEmitterInitialized() defaulted _maxListenersWarned but not _maxListeners, so emitters that acquire _events outside our ctor (undici's Client/Pool/Agent) had _maxListeners=undefined and 'total <= undefined' warned on the FIRST listener. Default _maxListeners so the threshold is meaningful. 3) undici client-per-request leak (the real net-bridge leak) The bridge's UndiciAgent was created with an UNBOUNDED per-origin pool. Requests that overlap while clients are still connecting each find every client kNeedDrain and spawn a fresh Client+socket -- and for HTTPS the LLM path is HTTP/2 (ALPN), so each spawn is a whole new h2 session. The synchronous bridge reads widen that connect window (the #122 per-payload macrotask yield only helps h1 same-socket reuse, nothing for the h2 connect-window herd). Over a long many-call turn the abandoned clients accumulate connect/close/drain/error/finish/readable/end/ terminated listeners without bound -> http2 degradation -> 'Request was aborted.' Bound connections (6, browser-like; h2 multiplexes within each) so excess requests queue on existing clients instead of spawning new ones. Test keepalive_no_listener_leak now drives CONCURRENT requests (the trigger) and asserts the host-side connection count stays bounded: 160 requests over 6 connections with the cap vs 16 (2x concurrency) without it. A process.on('warning') handler surfaces any MaxListenersExceededWarning to stderr (the warning alone is insufficient -- leaked listeners spread across per-client emitters). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/execution/assets/v8-bridge.source.js | 21 ++ crates/sidecar/src/execution.rs | 30 +++ crates/sidecar/tests/fetch_via_undici.rs | 231 +++++++++++++++++++- crates/sidecar/tests/service.rs | 173 +++++++++++++++ 4 files changed, 454 insertions(+), 1 deletion(-) diff --git a/crates/execution/assets/v8-bridge.source.js b/crates/execution/assets/v8-bridge.source.js index 00d3dae0..20729a4c 100644 --- a/crates/execution/assets/v8-bridge.source.js +++ b/crates/execution/assets/v8-bridge.source.js @@ -10127,6 +10127,18 @@ var __bridge = (() => { var secureExecUndiciDispatcher = null; function createSecureExecUndiciDispatcher() { return new UndiciAgent({ + // Bound the per-origin connection pool. With an unbounded pool, requests that + // overlap while the pool's clients are still connecting each find every client + // marked kNeedDrain and spawn a brand-new Client+socket (HTTP/2: a whole new + // session) instead of reusing one -- and the bridge's synchronous socket reads + // widen that connect window. Over a long, many-call turn (e.g. an LLM agent flow) + // those abandoned clients accumulate their listener sets (connect/close/drain/ + // error/finish/readable/end/terminated) without bound, tripping + // MaxListenersExceededWarning and degrading the HTTP/2 path until requests abort. + // Capping connections makes excess requests queue on existing clients (HTTP/2 + // multiplexes within one), so sockets/sessions/listeners stay bounded. 6 mirrors + // the browser per-origin connection limit; HTTP/2 multiplexes within each. + connections: 6, connect(options, callback) { try { let protocol = options?.protocol === "https:" || options?.protocol === "https" ? "https:" : "http:"; @@ -21810,6 +21822,15 @@ ${headerLines}\r if (!(target._maxListenersWarned instanceof Set)) { target._maxListenersWarned = /* @__PURE__ */ new Set(); } + // An emitter can acquire `_events` without going through our constructor (e.g. + // a subclass that sets up its own event storage). If `_maxListeners` was never + // initialized, `maybeWarnEventEmitterListeners` would treat `total <= undefined` + // as false and fire a spurious MaxListenersExceededWarning on the *first* + // listener (reported count "1"). Default it to the standard limit so the + // threshold check is meaningful. + if (typeof target._maxListeners !== "number") { + target._maxListeners = eventsDefaultMaxListeners; + } } function createMaxListenersExceededWarning(emitter, event, total) { const maxListeners = Number.isFinite(emitter._maxListeners) ? emitter._maxListeners : eventsDefaultMaxListeners; diff --git a/crates/sidecar/src/execution.rs b/crates/sidecar/src/execution.rs index f4eea8a7..19e54d29 100644 --- a/crates/sidecar/src/execution.rs +++ b/crates/sidecar/src/execution.rs @@ -8171,6 +8171,36 @@ fn sync_host_directory_tree_to_kernel_inner( ) }); let desired_mode = host_shadow_mode(&metadata); + // Fast path: skip the expensive re-read + re-write when the kernel already + // holds a copy of this shadow file that matches on size, mode, and mtime. + // + // Every read-side fs op (exists/stat/readFile/...) triggers a full + // shadow-tree reconciliation walk. Without this skip the walk re-reads every + // file's bytes from the host and re-writes them into the kernel VFS on every + // op -- O(whole tree) per op, and super-linear as the VM's shadow grows, + // which is a dominant source of session-creation/runtime latency on + // populated VMs. + // + // This is a (size, mode, mtime) quick-check, the same heuristic rsync uses + // by default. It needs no separate cache to invalidate -- it compares against + // the kernel's own stat, so a kernel reset (e.g. a layer swap) or any host + // change that moves size/mode/mtime forces a resync. Limitation: mtime is + // compared at the millisecond granularity the kernel stores (utimes truncates + // to ms), so a host-side rewrite that preserves byte length AND mode AND lands + // in the same wall-clock millisecond can be skipped and leave stale bytes. + // That window is sub-millisecond same-length edits; if it ever matters here, + // upgrade this to a content digest (or full-precision mtime) for files whose + // mtime is within the last few ms of `now`. + if let Ok(existing) = vm.kernel.lstat(&guest_path) { + if !existing.is_directory + && !existing.is_symbolic_link + && existing.size == metadata.len() + && (existing.mode & 0o7777) == (desired_mode & 0o7777) + && existing.mtime_ms == mtime_ms + { + continue; + } + } let bytes = read_host_shadow_file(&host_path, desired_mode).map_err(|error| { SidecarError::Io(format!( "failed to read host shadow file {}: {error}", diff --git a/crates/sidecar/tests/fetch_via_undici.rs b/crates/sidecar/tests/fetch_via_undici.rs index 70c3d385..9f7a3779 100644 --- a/crates/sidecar/tests/fetch_via_undici.rs +++ b/crates/sidecar/tests/fetch_via_undici.rs @@ -5,6 +5,8 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::TcpListener; use std::process::Command; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; use support::{ @@ -13,7 +15,233 @@ use support::{ open_session_wire, temp_dir, write_fixture, }; -const FETCH_VIA_UNDICI_CASES: &[&str] = &["fetch", "abort"]; +const FETCH_VIA_UNDICI_CASES: &[&str] = &["fetch", "abort", "keepalive_no_listener_leak"]; + +// Regression guard for the net-bridge socket-listener leak: a long-lived VM that +// makes many keep-alive HTTP requests over one reused bridge socket must not +// accumulate per-request socket/undici listeners. Each leaked listener trips the +// guest EventEmitter's `MaxListenersExceededWarning` via `console.error`, which +// lands on the process stderr -- so an empty stderr after N >> 10 reused requests +// proves listeners are added and removed symmetrically per request. +fn javascript_fetch_keepalive_does_not_leak_socket_listeners() { + assert_node_available(); + + // Concurrency is the trigger: with an UNBOUNDED per-origin pool, overlapping + // requests dispatched during the connect window each find every existing client + // still `kNeedDrain` and spawn a fresh Client+socket (undici pool.rs kGetDispatcher), + // accumulating their listener sets without bound. The fix bounds the agent's + // `connections`, so excess requests queue on existing clients instead of spawning + // new ones -- the host-side connection count (each guest socket = one accepted TCP + // connection) then stays a small multiple of the concurrency instead of growing + // toward the total request count. + const CONCURRENCY: usize = 8; + const ROUNDS: usize = 20; + const REQUESTS: usize = CONCURRENCY * ROUNDS; + + let mut sidecar = new_sidecar("fetch-keepalive-leak"); + let cwd = temp_dir("fetch-keepalive-leak-cwd"); + let entry = cwd.join("fetch-keepalive-entry.mjs"); + + let listener = TcpListener::bind("127.0.0.1:0").expect("bind host http listener"); + let port = listener.local_addr().expect("listener addr").port(); + let served = Arc::new(AtomicUsize::new(0)); + let connections = Arc::new(AtomicUsize::new(0)); + let served_server = served.clone(); + let connections_server = connections.clone(); + let server = thread::spawn(move || { + listener + .set_nonblocking(true) + .expect("configure nonblocking listener"); + let deadline = Instant::now() + Duration::from_secs(25); + let mut handlers = Vec::new(); + // Accept every connection the guest opens (one reused keep-alive socket in the + // common case, but robust to a fresh socket per request) and serve keep-alive + // responses on each until the guest closes it. Reading to the client's EOF + // means teardown is a clean FIN, never an ECONNRESET test artifact. + while served_server.load(Ordering::SeqCst) < REQUESTS && Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + connections_server.fetch_add(1, Ordering::SeqCst); + let served_handler = served.clone(); + handlers.push(thread::spawn(move || { + stream + .set_read_timeout(Some(Duration::from_millis(200))) + .expect("configure read timeout"); + let handler_deadline = Instant::now() + Duration::from_secs(20); + let mut buffer: Vec = Vec::new(); + let mut chunk = [0_u8; 4096]; + while Instant::now() < handler_deadline { + match stream.read(&mut chunk) { + Ok(0) => break, // guest closed this connection + Ok(n) => { + buffer.extend_from_slice(&chunk[..n]); + // GET requests have no body: each ends at CRLFCRLF. + while let Some(pos) = buffer + .windows(4) + .position(|window| window == b"\r\n\r\n") + { + buffer.drain(..pos + 4); + if stream + .write_all( + b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 11\r\nConnection: keep-alive\r\n\r\nhello world", + ) + .is_err() + { + return; + } + let _ = stream.flush(); + served_handler.fetch_add(1, Ordering::SeqCst); + } + } + Err(error) + if error.kind() == std::io::ErrorKind::WouldBlock + || error.kind() == std::io::ErrorKind::TimedOut => + { + continue + } + Err(_) => break, + } + } + })); + } + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(error) => panic!("accept http request: {error}"), + } + } + // Let in-flight handlers drain to the guest's clean close. + for handler in handlers { + let _ = handler.join(); + } + ( + served_server.load(Ordering::SeqCst), + connections_server.load(Ordering::SeqCst), + ) + }); + + write_fixture( + &entry, + format!( + r#" +const ROUNDS = {ROUNDS}; +const CONCURRENCY = {CONCURRENCY}; +// Surface Node process warnings (MaxListenersExceededWarning et al.) onto stderr so +// the host can assert on them -- there is no default "warning" handler otherwise, so a +// listener-leak warning would be emitted-and-dropped instead of observed. +process.on("warning", (warning) => {{ + console.error(`PROCESS_WARNING ${{warning.name}}: ${{warning.message}}`); +}}); +let done = 0; +// Overlapping requests per round (not strictly sequential) so they dispatch while +// the pool's clients are still connecting -- the condition that makes an unbounded +// pool spawn a fresh client+socket per request. +for (let round = 0; round < ROUNDS; round++) {{ + await Promise.all(Array.from({{ length: CONCURRENCY }}, async () => {{ + const response = await fetch("http://127.0.0.1:{port}/health", {{ + headers: {{ accept: "text/plain" }}, + }}); + const body = await response.text(); + if (response.status !== 200 || body !== "hello world") {{ + throw new Error(`request failed: status=${{response.status}} body=${{body}}`); + }} + done++; + }})); +}} +console.log(JSON.stringify({{ ok: true, count: done }})); +"#, + ), + ); + + let connection_id = authenticate_wire(&mut sidecar, "conn-1"); + let session_id = open_session_wire(&mut sidecar, 2, &connection_id); + let mut metadata = HashMap::new(); + metadata.insert( + String::from("env.AGENTOS_LOOPBACK_EXEMPT_PORTS"), + format!("[{port}]"), + ); + let (vm_id, _) = create_vm_wire_with_metadata( + &mut sidecar, + 3, + &connection_id, + &session_id, + GuestRuntimeKind::JavaScript, + &cwd, + metadata, + ); + + execute_wire( + &mut sidecar, + 4, + &connection_id, + &session_id, + &vm_id, + "fetch-keepalive-process", + GuestRuntimeKind::JavaScript, + &entry, + Vec::new(), + ); + + let (stdout, stderr, exit_code) = collect_process_output_wire_with_timeout( + &mut sidecar, + &connection_id, + &session_id, + &vm_id, + "fetch-keepalive-process", + Duration::from_secs(30), + ); + dispose_vm_and_close_session(&mut sidecar, &connection_id, &session_id, &vm_id); + let server_result = server.join(); + + assert_eq!(exit_code, 0, "stdout:\n{stdout}\nstderr:\n{stderr}"); + // The leak guard: a MaxListenersExceededWarning (re-surfaced via the fixture's + // process.on("warning") handler) lands on stderr. The fixture's only other stderr + // output would be an uncaught error, so stderr must be empty on the happy path. + assert!( + stderr.trim().is_empty(), + "keep-alive request loop produced unexpected stderr (possible listener leak):\n{stderr}" + ); + assert!( + !stderr.contains("MaxListenersExceededWarning"), + "socket-listener leak detected after {REQUESTS} keep-alive requests:\n{stderr}" + ); + let json_line = stdout + .lines() + .rev() + .find(|line| !line.trim().is_empty()) + .expect("stdout json line"); + let payload: serde_json::Value = + serde_json::from_str(json_line).expect("parse keepalive fetch result"); + assert_eq!(payload["ok"], true); + assert_eq!(payload["count"], REQUESTS); + let (served_count, connection_count) = server_result + .unwrap_or_else(|_| panic!("server thread failed\nstdout:\n{stdout}\nstderr:\n{stderr}")); + assert_eq!( + served_count, REQUESTS, + "server should have served every request" + ); + // The real leak guard: the per-origin pool must be BOUNDED. Each guest socket is one + // accepted TCP connection here, so an unbounded pool that spawns a fresh client+socket + // per overlapping request makes `connection_count` grow with the total request count; + // a bounded pool keeps it to a small multiple of the concurrency regardless of how + // many requests run. (The MaxListenersExceededWarning guard above is necessary but not + // sufficient -- the leaked listeners spread across many per-client emitters, so no + // single one may cross the threshold even while sockets grow unbounded.) + // With the bounded pool, connections stay at/under the cap (+ a small margin for any + // mid-run reconnect) regardless of how many requests run. Observed: 6 with the cap vs + // ~2x the concurrency (16) without it for this 8-way / 160-request load. + let connection_bound = CONCURRENCY + 2; + assert!( + connection_count <= connection_bound, + "undici client-per-request leak: {served_count} requests over {connection_count} \ + connections (expected <= {connection_bound} for concurrency {CONCURRENCY}); an \ + unbounded pool spawns a fresh client+socket per overlapping request" + ); + eprintln!( + "[keepalive-leak] served {served_count} requests over {connection_count} connection(s) \ + (bound {connection_bound})" + ); +} fn javascript_fetch_uses_guest_undici_over_kernel_tcp_socket() { assert_node_available(); @@ -326,6 +554,7 @@ fn run_named_case(case_name: &str) { match case_name { "fetch" => javascript_fetch_uses_guest_undici_over_kernel_tcp_socket(), "abort" => javascript_fetch_honors_abortsignal_timeout_and_manual_abort(), + "keepalive_no_listener_leak" => javascript_fetch_keepalive_does_not_leak_socket_listeners(), other => panic!("unknown fetch_via_undici case: {other}"), } } diff --git a/crates/sidecar/tests/service.rs b/crates/sidecar/tests/service.rs index ca73a52a..69b7991d 100644 --- a/crates/sidecar/tests/service.rs +++ b/crates/sidecar/tests/service.rs @@ -6214,6 +6214,179 @@ ykAheWCsAteSEWVc0w==\n\ configure_vm_passes_resource_read_limits_to_module_access_mounts(); } + // Regression guard for the read-side shadow-walk fix. + // + // Every read-side guest fs op (Exists/Stat/Lstat/ReadFile) reconciles the host + // shadow tree into the kernel VFS first. The reconciliation walks the whole tree + // from `vm.cwd`, but it must now SKIP files the kernel already holds an identical + // copy of (same size/mode/mtime) instead of unconditionally re-reading every + // file's bytes and re-writing them into the kernel. Without the skip a single + // `exists("/anything")` costs O(whole tree) and is super-linear as the shadow + // grows -- the session-creation/runtime latency this fixes. + // + // We prove two things: + // 1. A warm read op over an UNCHANGED tree is far cheaper than the first + // (cold) one, i.e. unchanged files are skipped, not re-copied. + // 2. The skip is self-correcting: after a file's content changes, a read still + // observes the new bytes (no stale skip). + fn read_side_ops_skip_unchanged_shadow_files_repro() { + use std::time::{Duration, Instant}; + + fn fs_payload( + operation: GuestFilesystemOperation, + path: &str, + content: Option, + ) -> RequestPayload { + RequestPayload::GuestFilesystemCall(GuestFilesystemCallRequest { + operation, + path: String::from(path), + destination_path: None, + target: None, + content, + encoding: Some(RootFilesystemEntryEncoding::Utf8), + recursive: true, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, + len: None, + offset: None, + }) + } + + fn dispatch( + sidecar: &mut NativeSidecar, + ownership: &OwnershipScope, + next_id: &mut i64, + payload: RequestPayload, + ) -> ResponsePayload { + *next_id += 1; + sidecar + .dispatch_blocking(request(*next_id, ownership.clone(), payload)) + .expect("dispatch guest fs op") + .response + .payload + } + + // Seed flat files `from..to` via guest WriteFile (mirrors into the host + // shadow root). Write-side ops do not walk, so seeding is O(count). + fn seed_to( + sidecar: &mut NativeSidecar, + ownership: &OwnershipScope, + next_id: &mut i64, + body: &str, + from: usize, + to: usize, + ) { + for i in from..to { + let path = format!("/seed-{i:05}.txt"); + let payload = fs_payload( + GuestFilesystemOperation::WriteFile, + &path, + Some(String::from(body)), + ); + match dispatch(sidecar, ownership, next_id, payload) { + ResponsePayload::GuestFilesystemResult(_) => {} + other => panic!("seed write failed: {other:?}"), + } + } + } + + fn time_exists( + sidecar: &mut NativeSidecar, + ownership: &OwnershipScope, + next_id: &mut i64, + ) -> Duration { + let payload = fs_payload(GuestFilesystemOperation::Exists, "/zzz-not-here", None); + let start = Instant::now(); + match dispatch(sidecar, ownership, next_id, payload) { + ResponsePayload::GuestFilesystemResult(r) => assert_eq!(r.exists, Some(false)), + other => panic!("exists failed: {other:?}"), + } + start.elapsed() + } + + let mut sidecar = create_test_sidecar(); + let (connection_id, session_id) = + authenticate_and_open_session(&mut sidecar).expect("authenticate and open session"); + let vm_id = create_vm( + &mut sidecar, + &connection_id, + &session_id, + PermissionsPolicy::allow_all(), + ) + .expect("create vm"); + let ownership = OwnershipScope::vm(&connection_id, &session_id, &vm_id); + let mut next_id: i64 = 1000; + + let file_body = "a".repeat(8 * 1024); + const COUNT: usize = 800; + seed_to(&mut sidecar, &ownership, &mut next_id, &file_body, 0, COUNT); + + // Cold: first read op reconciles the whole tree (reads + writes every file). + let cold = time_exists(&mut sidecar, &ownership, &mut next_id); + // Warm: tree is unchanged, so every file must be skipped. + let warm = time_exists(&mut sidecar, &ownership, &mut next_id); + + eprintln!("[shadow-skip] cold={cold:?} warm={warm:?}"); + + // Symptom-1 guard: the warm walk skips unchanged files, so it is far cheaper + // than the cold walk that copied them all. (Lenient 4x; observed >>10x.) + assert!( + cold >= warm * 4, + "warm read op over an unchanged shadow tree should skip re-copying files: \ + cold={cold:?} warm={warm:?}" + ); + + // End-to-end smoke: overwrite a seeded file (different length) then read it + // back and observe the new bytes. NOTE: this is a guest WriteFile, which + // updates the kernel directly, so it does not exercise the host-shadow->kernel + // skip predicate itself -- it only guards that overwrite-then-read is coherent. + // A true stale-skip test (host-side rewrite that keeps size+mode+mtime) is not + // reachable through the public wire API and would need an in-crate unit test + // with direct shadow-root access; see the skip-limitation note in + // sync_host_directory_tree_to_kernel_inner. + let changed_path = "/seed-00042.txt"; + let new_body = "b".repeat(16 * 1024); + match dispatch( + &mut sidecar, + &ownership, + &mut next_id, + fs_payload( + GuestFilesystemOperation::WriteFile, + changed_path, + Some(new_body.clone()), + ), + ) { + ResponsePayload::GuestFilesystemResult(_) => {} + other => panic!("overwrite failed: {other:?}"), + } + match dispatch( + &mut sidecar, + &ownership, + &mut next_id, + fs_payload(GuestFilesystemOperation::ReadFile, changed_path, None), + ) { + ResponsePayload::GuestFilesystemResult(r) => { + assert_eq!( + r.content.as_deref(), + Some(new_body.as_str()), + "changed shadow file must not be served stale by the skip" + ); + } + other => panic!("read after overwrite failed: {other:?}"), + } + } + + // Expensive: seeds hundreds of files and pays one cold full-tree reconciliation + // (seconds in debug). Gated out of the default suite; run with `--ignored`. + #[test] + #[ignore = "expensive: cold shadow-tree reconciliation; run with --ignored"] + fn read_side_ops_skip_unchanged_shadow_files() { + read_side_ops_skip_unchanged_shadow_files_repro(); + } + fn configure_vm_rejects_module_access_root_symlink_to_non_node_modules() { let module_access_cwd = temp_dir("secure-exec-sidecar-module-access-symlink-cwd"); let outside_root = temp_dir("secure-exec-sidecar-module-access-outside");