Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions crates/execution/assets/v8-bridge.source.js
Original file line number Diff line number Diff line change
Expand Up @@ -10127,6 +10127,18 @@ var __bridge = (() => {
var secureExecUndiciDispatcher = null;
function createSecureExecUndiciDispatcher() {
return new UndiciAgent({
// Bound the per-origin connection pool. With an unbounded pool, requests that
// overlap while the pool's clients are still connecting each find every client
// marked kNeedDrain and spawn a brand-new Client+socket (HTTP/2: a whole new
// session) instead of reusing one -- and the bridge's synchronous socket reads
// widen that connect window. Over a long, many-call turn (e.g. an LLM agent flow)
// those abandoned clients accumulate their listener sets (connect/close/drain/
// error/finish/readable/end/terminated) without bound, tripping
// MaxListenersExceededWarning and degrading the HTTP/2 path until requests abort.
// Capping connections makes excess requests queue on existing clients (HTTP/2
// multiplexes within one), so sockets/sessions/listeners stay bounded. 6 mirrors
// the browser per-origin connection limit; HTTP/2 multiplexes within each.
connections: 6,
connect(options, callback) {
try {
let protocol = options?.protocol === "https:" || options?.protocol === "https" ? "https:" : "http:";
Expand Down Expand Up @@ -21810,6 +21822,15 @@ ${headerLines}\r
if (!(target._maxListenersWarned instanceof Set)) {
target._maxListenersWarned = /* @__PURE__ */ new Set();
}
// An emitter can acquire `_events` without going through our constructor (e.g.
// a subclass that sets up its own event storage). If `_maxListeners` was never
// initialized, `maybeWarnEventEmitterListeners` would treat `total <= undefined`
// as false and fire a spurious MaxListenersExceededWarning on the *first*
// listener (reported count "1"). Default it to the standard limit so the
// threshold check is meaningful.
if (typeof target._maxListeners !== "number") {
target._maxListeners = eventsDefaultMaxListeners;
}
}
function createMaxListenersExceededWarning(emitter, event, total) {
const maxListeners = Number.isFinite(emitter._maxListeners) ? emitter._maxListeners : eventsDefaultMaxListeners;
Expand Down
30 changes: 30 additions & 0 deletions crates/sidecar/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8171,6 +8171,36 @@ fn sync_host_directory_tree_to_kernel_inner(
)
});
let desired_mode = host_shadow_mode(&metadata);
// Fast path: skip the expensive re-read + re-write when the kernel already
// holds a copy of this shadow file that matches on size, mode, and mtime.
//
// Every read-side fs op (exists/stat/readFile/...) triggers a full
// shadow-tree reconciliation walk. Without this skip the walk re-reads every
// file's bytes from the host and re-writes them into the kernel VFS on every
// op -- O(whole tree) per op, and super-linear as the VM's shadow grows,
// which is a dominant source of session-creation/runtime latency on
// populated VMs.
//
// This is a (size, mode, mtime) quick-check, the same heuristic rsync uses
// by default. It needs no separate cache to invalidate -- it compares against
// the kernel's own stat, so a kernel reset (e.g. a layer swap) or any host
// change that moves size/mode/mtime forces a resync. Limitation: mtime is
// compared at the millisecond granularity the kernel stores (utimes truncates
// to ms), so a host-side rewrite that preserves byte length AND mode AND lands
// in the same wall-clock millisecond can be skipped and leave stale bytes.
// That window is sub-millisecond same-length edits; if it ever matters here,
// upgrade this to a content digest (or full-precision mtime) for files whose
// mtime is within the last few ms of `now`.
if let Ok(existing) = vm.kernel.lstat(&guest_path) {
if !existing.is_directory
&& !existing.is_symbolic_link
&& existing.size == metadata.len()
&& (existing.mode & 0o7777) == (desired_mode & 0o7777)
&& existing.mtime_ms == mtime_ms
{
continue;
}
}
let bytes = read_host_shadow_file(&host_path, desired_mode).map_err(|error| {
SidecarError::Io(format!(
"failed to read host shadow file {}: {error}",
Expand Down
231 changes: 230 additions & 1 deletion crates/sidecar/tests/fetch_via_undici.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use support::{
Expand All @@ -13,7 +15,233 @@ use support::{
open_session_wire, temp_dir, write_fixture,
};

const FETCH_VIA_UNDICI_CASES: &[&str] = &["fetch", "abort"];
const FETCH_VIA_UNDICI_CASES: &[&str] = &["fetch", "abort", "keepalive_no_listener_leak"];

// Regression guard for the net-bridge socket-listener leak: a long-lived VM that
// makes many keep-alive HTTP requests over one reused bridge socket must not
// accumulate per-request socket/undici listeners. Each leaked listener trips the
// guest EventEmitter's `MaxListenersExceededWarning` via `console.error`, which
// lands on the process stderr -- so an empty stderr after N >> 10 reused requests
// proves listeners are added and removed symmetrically per request.
fn javascript_fetch_keepalive_does_not_leak_socket_listeners() {
assert_node_available();

// Concurrency is the trigger: with an UNBOUNDED per-origin pool, overlapping
// requests dispatched during the connect window each find every existing client
// still `kNeedDrain` and spawn a fresh Client+socket (undici pool.rs kGetDispatcher),
// accumulating their listener sets without bound. The fix bounds the agent's
// `connections`, so excess requests queue on existing clients instead of spawning
// new ones -- the host-side connection count (each guest socket = one accepted TCP
// connection) then stays a small multiple of the concurrency instead of growing
// toward the total request count.
const CONCURRENCY: usize = 8;
const ROUNDS: usize = 20;
const REQUESTS: usize = CONCURRENCY * ROUNDS;

let mut sidecar = new_sidecar("fetch-keepalive-leak");
let cwd = temp_dir("fetch-keepalive-leak-cwd");
let entry = cwd.join("fetch-keepalive-entry.mjs");

let listener = TcpListener::bind("127.0.0.1:0").expect("bind host http listener");
let port = listener.local_addr().expect("listener addr").port();
let served = Arc::new(AtomicUsize::new(0));
let connections = Arc::new(AtomicUsize::new(0));
let served_server = served.clone();
let connections_server = connections.clone();
let server = thread::spawn(move || {
listener
.set_nonblocking(true)
.expect("configure nonblocking listener");
let deadline = Instant::now() + Duration::from_secs(25);
let mut handlers = Vec::new();
// Accept every connection the guest opens (one reused keep-alive socket in the
// common case, but robust to a fresh socket per request) and serve keep-alive
// responses on each until the guest closes it. Reading to the client's EOF
// means teardown is a clean FIN, never an ECONNRESET test artifact.
while served_server.load(Ordering::SeqCst) < REQUESTS && Instant::now() < deadline {
match listener.accept() {
Ok((mut stream, _)) => {
connections_server.fetch_add(1, Ordering::SeqCst);
let served_handler = served.clone();
handlers.push(thread::spawn(move || {
stream
.set_read_timeout(Some(Duration::from_millis(200)))
.expect("configure read timeout");
let handler_deadline = Instant::now() + Duration::from_secs(20);
let mut buffer: Vec<u8> = Vec::new();
let mut chunk = [0_u8; 4096];
while Instant::now() < handler_deadline {
match stream.read(&mut chunk) {
Ok(0) => break, // guest closed this connection
Ok(n) => {
buffer.extend_from_slice(&chunk[..n]);
// GET requests have no body: each ends at CRLFCRLF.
while let Some(pos) = buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
{
buffer.drain(..pos + 4);
if stream
.write_all(
b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 11\r\nConnection: keep-alive\r\n\r\nhello world",
)
.is_err()
{
return;
}
let _ = stream.flush();
served_handler.fetch_add(1, Ordering::SeqCst);
}
}
Err(error)
if error.kind() == std::io::ErrorKind::WouldBlock
|| error.kind() == std::io::ErrorKind::TimedOut =>
{
continue
}
Err(_) => break,
}
}
}));
}
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(error) => panic!("accept http request: {error}"),
}
}
// Let in-flight handlers drain to the guest's clean close.
for handler in handlers {
let _ = handler.join();
}
(
served_server.load(Ordering::SeqCst),
connections_server.load(Ordering::SeqCst),
)
});

write_fixture(
&entry,
format!(
r#"
const ROUNDS = {ROUNDS};
const CONCURRENCY = {CONCURRENCY};
// Surface Node process warnings (MaxListenersExceededWarning et al.) onto stderr so
// the host can assert on them -- there is no default "warning" handler otherwise, so a
// listener-leak warning would be emitted-and-dropped instead of observed.
process.on("warning", (warning) => {{
console.error(`PROCESS_WARNING ${{warning.name}}: ${{warning.message}}`);
}});
let done = 0;
// Overlapping requests per round (not strictly sequential) so they dispatch while
// the pool's clients are still connecting -- the condition that makes an unbounded
// pool spawn a fresh client+socket per request.
for (let round = 0; round < ROUNDS; round++) {{
await Promise.all(Array.from({{ length: CONCURRENCY }}, async () => {{
const response = await fetch("http://127.0.0.1:{port}/health", {{
headers: {{ accept: "text/plain" }},
}});
const body = await response.text();
if (response.status !== 200 || body !== "hello world") {{
throw new Error(`request failed: status=${{response.status}} body=${{body}}`);
}}
done++;
}}));
}}
console.log(JSON.stringify({{ ok: true, count: done }}));
"#,
),
);

let connection_id = authenticate_wire(&mut sidecar, "conn-1");
let session_id = open_session_wire(&mut sidecar, 2, &connection_id);
let mut metadata = HashMap::new();
metadata.insert(
String::from("env.AGENTOS_LOOPBACK_EXEMPT_PORTS"),
format!("[{port}]"),
);
let (vm_id, _) = create_vm_wire_with_metadata(
&mut sidecar,
3,
&connection_id,
&session_id,
GuestRuntimeKind::JavaScript,
&cwd,
metadata,
);

execute_wire(
&mut sidecar,
4,
&connection_id,
&session_id,
&vm_id,
"fetch-keepalive-process",
GuestRuntimeKind::JavaScript,
&entry,
Vec::new(),
);

let (stdout, stderr, exit_code) = collect_process_output_wire_with_timeout(
&mut sidecar,
&connection_id,
&session_id,
&vm_id,
"fetch-keepalive-process",
Duration::from_secs(30),
);
dispose_vm_and_close_session(&mut sidecar, &connection_id, &session_id, &vm_id);
let server_result = server.join();

assert_eq!(exit_code, 0, "stdout:\n{stdout}\nstderr:\n{stderr}");
// The leak guard: a MaxListenersExceededWarning (re-surfaced via the fixture's
// process.on("warning") handler) lands on stderr. The fixture's only other stderr
// output would be an uncaught error, so stderr must be empty on the happy path.
assert!(
stderr.trim().is_empty(),
"keep-alive request loop produced unexpected stderr (possible listener leak):\n{stderr}"
);
assert!(
!stderr.contains("MaxListenersExceededWarning"),
"socket-listener leak detected after {REQUESTS} keep-alive requests:\n{stderr}"
);
let json_line = stdout
.lines()
.rev()
.find(|line| !line.trim().is_empty())
.expect("stdout json line");
let payload: serde_json::Value =
serde_json::from_str(json_line).expect("parse keepalive fetch result");
assert_eq!(payload["ok"], true);
assert_eq!(payload["count"], REQUESTS);
let (served_count, connection_count) = server_result
.unwrap_or_else(|_| panic!("server thread failed\nstdout:\n{stdout}\nstderr:\n{stderr}"));
assert_eq!(
served_count, REQUESTS,
"server should have served every request"
);
// The real leak guard: the per-origin pool must be BOUNDED. Each guest socket is one
// accepted TCP connection here, so an unbounded pool that spawns a fresh client+socket
// per overlapping request makes `connection_count` grow with the total request count;
// a bounded pool keeps it to a small multiple of the concurrency regardless of how
// many requests run. (The MaxListenersExceededWarning guard above is necessary but not
// sufficient -- the leaked listeners spread across many per-client emitters, so no
// single one may cross the threshold even while sockets grow unbounded.)
// With the bounded pool, connections stay at/under the cap (+ a small margin for any
// mid-run reconnect) regardless of how many requests run. Observed: 6 with the cap vs
// ~2x the concurrency (16) without it for this 8-way / 160-request load.
let connection_bound = CONCURRENCY + 2;
assert!(
connection_count <= connection_bound,
"undici client-per-request leak: {served_count} requests over {connection_count} \
connections (expected <= {connection_bound} for concurrency {CONCURRENCY}); an \
unbounded pool spawns a fresh client+socket per overlapping request"
);
eprintln!(
"[keepalive-leak] served {served_count} requests over {connection_count} connection(s) \
(bound {connection_bound})"
);
}

fn javascript_fetch_uses_guest_undici_over_kernel_tcp_socket() {
assert_node_available();
Expand Down Expand Up @@ -326,6 +554,7 @@ fn run_named_case(case_name: &str) {
match case_name {
"fetch" => javascript_fetch_uses_guest_undici_over_kernel_tcp_socket(),
"abort" => javascript_fetch_honors_abortsignal_timeout_and_manual_abort(),
"keepalive_no_listener_leak" => javascript_fetch_keepalive_does_not_leak_socket_listeners(),
other => panic!("unknown fetch_via_undici case: {other}"),
}
}
Expand Down
Loading
Loading