diff --git a/Cargo.lock b/Cargo.lock index cc0b85be..ce0c3b03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1176,6 +1176,7 @@ dependencies = [ "test-log", "thiserror 2.0.18", "tokio", + "tokio-util", "which", "winapi", "winsafe 0.0.24", @@ -1200,6 +1201,7 @@ dependencies = [ "rustc-hash", "serde", "tokio", + "tokio-util", "toml", ] @@ -3874,6 +3876,7 @@ dependencies = [ "nix 0.30.1", "once_cell", "owo-colors", + "petgraph", "pty_terminal_test_client", "rayon", "rusqlite", @@ -3883,6 +3886,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", "twox-hash", "vite_path", @@ -3892,6 +3896,7 @@ dependencies = [ "vite_task_plan", "vite_workspace", "wax", + "winapi", ] [[package]] diff --git a/crates/fspy/Cargo.toml b/crates/fspy/Cargo.toml index abcc4b4c..7269f0f9 100644 --- a/crates/fspy/Cargo.toml +++ b/crates/fspy/Cargo.toml @@ -20,6 +20,7 @@ rustc-hash = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["net", "process", "io-util", "sync", "rt"] } +tokio-util = { workspace = true } which = { workspace = true, features = ["tracing"] } xxhash-rust = { workspace = true } diff --git a/crates/fspy/examples/cli.rs b/crates/fspy/examples/cli.rs index 1de519c0..4ae34790 100644 --- a/crates/fspy/examples/cli.rs +++ b/crates/fspy/examples/cli.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let mut command = fspy::Command::new(program); command.envs(std::env::vars_os()).args(args); - let child = command.spawn().await?; + let child = command.spawn(tokio_util::sync::CancellationToken::new()).await?; let termination = child.wait_handle.await?; let mut path_count = 0usize; diff --git a/crates/fspy/src/command.rs b/crates/fspy/src/command.rs index 5e9900c9..2a72c67d 100644 --- a/crates/fspy/src/command.rs +++ b/crates/fspy/src/command.rs @@ -8,6 +8,7 @@ use std::{ use fspy_shared_unix::exec::Exec; use rustc_hash::FxHashMap; use tokio::process::Command as TokioCommand; +use tokio_util::sync::CancellationToken; use crate::{SPY_IMPL, TrackedChild, error::SpawnError}; @@ -167,9 +168,12 @@ impl Command { /// # Errors /// /// Returns [`SpawnError`] if program resolution fails or the process cannot be spawned. - pub async fn spawn(mut self) -> Result { + pub async fn spawn( + mut self, + cancellation_token: CancellationToken, + ) -> Result { self.resolve_program()?; - SPY_IMPL.spawn(self).await + SPY_IMPL.spawn(self, cancellation_token).await } /// Resolve program name to full path using `PATH` and cwd. diff --git a/crates/fspy/src/lib.rs b/crates/fspy/src/lib.rs index 8a25ca06..70756003 100644 --- a/crates/fspy/src/lib.rs +++ b/crates/fspy/src/lib.rs @@ -54,6 +54,13 @@ pub struct TrackedChild { /// The future that resolves to exit status and path accesses when the process exits. pub wait_handle: BoxFuture<'static, io::Result>, + + /// A duplicated process handle of the child, captured before the tokio `Child` + /// is moved into the background wait task. This is an independently owned handle + /// (via `DuplicateHandle`) so it remains valid even after tokio closes its copy. + /// Callers can use this to assign the process to a Win32 Job Object. + #[cfg(windows)] + pub process_handle: std::os::windows::io::OwnedHandle, } pub(crate) static SPY_IMPL: LazyLock = LazyLock::new(|| { diff --git a/crates/fspy/src/unix/mod.rs b/crates/fspy/src/unix/mod.rs index 6e77f9cb..7f946005 100644 --- a/crates/fspy/src/unix/mod.rs +++ b/crates/fspy/src/unix/mod.rs @@ -22,6 +22,7 @@ use futures_util::FutureExt; #[cfg(target_os = "linux")] use syscall_handler::SyscallHandler; use tokio::task::spawn_blocking; +use tokio_util::sync::CancellationToken; use crate::{ ChildTermination, Command, TrackedChild, @@ -80,7 +81,11 @@ impl SpyImpl { }) } - pub(crate) async fn spawn(&self, mut command: Command) -> Result { + pub(crate) async fn spawn( + &self, + mut command: Command, + cancellation_token: CancellationToken, + ) -> Result { #[cfg(target_os = "linux")] let supervisor = supervise::().map_err(SpawnError::Supervisor)?; @@ -143,7 +148,13 @@ impl SpyImpl { // Keep polling for the child to exit in the background even if `wait_handle` is not awaited, // because we need to stop the supervisor and lock the channel as soon as the child exits. wait_handle: tokio::spawn(async move { - let status = child.wait().await?; + let status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; let arenas = std::iter::once(exec_resolve_accesses); // Stop the supervisor and collect path accesses from it. diff --git a/crates/fspy/src/windows/mod.rs b/crates/fspy/src/windows/mod.rs index b4f1c75e..22560f40 100644 --- a/crates/fspy/src/windows/mod.rs +++ b/crates/fspy/src/windows/mod.rs @@ -13,6 +13,7 @@ use fspy_shared::{ windows::{PAYLOAD_ID, Payload}, }; use futures_util::FutureExt; +use tokio_util::sync::CancellationToken; use winapi::{ shared::minwindef::TRUE, um::{processthreadsapi::ResumeThread, winbase::CREATE_SUSPENDED}, @@ -73,7 +74,11 @@ impl SpyImpl { } #[expect(clippy::unused_async, reason = "async signature required by SpyImpl trait")] - pub(crate) async fn spawn(&self, mut command: Command) -> Result { + pub(crate) async fn spawn( + &self, + mut command: Command, + cancellation_token: CancellationToken, + ) -> Result { let ansi_dll_path_with_nul = Arc::clone(&self.ansi_dll_path_with_nul); command.env("FSPY", "1"); let mut command = command.into_tokio_command(); @@ -135,14 +140,32 @@ impl SpyImpl { if *spawn_success { SpawnError::OsSpawn(err) } else { SpawnError::Injection(err) } })?; + // Duplicate the process handle before the child is moved into the background + // task. The duplicate is independently owned (its own ref count), so it stays + // valid even after tokio closes its copy when the process exits. + let process_handle = { + use std::os::windows::io::BorrowedHandle; + // SAFETY: The child was just spawned and hasn't been moved yet, so its + // raw handle is valid. `borrow_raw` creates a temporary borrow. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + borrowed.try_clone_to_owned().map_err(SpawnError::OsSpawn)? + }; + Ok(TrackedChild { stdin: child.stdin.take(), stdout: child.stdout.take(), stderr: child.stderr.take(), + process_handle, // Keep polling for the child to exit in the background even if `wait_handle` is not awaited, // because we need to stop the supervisor and lock the channel as soon as the child exits. wait_handle: tokio::spawn(async move { - let status = child.wait().await?; + let status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; // Lock the ipc channel after the child has exited. // We are not interested in path accesses from descendants after the main child has exited. let ipc_receiver_lock_guard = OwnedReceiverLockGuard::lock_async(receiver).await?; diff --git a/crates/fspy/tests/cancellation.rs b/crates/fspy/tests/cancellation.rs new file mode 100644 index 00000000..ff65cf2c --- /dev/null +++ b/crates/fspy/tests/cancellation.rs @@ -0,0 +1,32 @@ +use std::process::Stdio; + +use tokio::io::AsyncReadExt as _; +use tokio_util::sync::CancellationToken; + +#[test_log::test(tokio::test)] +async fn cancellation_kills_tracked_child() -> anyhow::Result<()> { + let cmd = subprocess_test::command_for_fn!((), |()| { + use std::io::Write as _; + // Signal readiness via stdout + std::io::stdout().write_all(b"ready\n").unwrap(); + std::io::stdout().flush().unwrap(); + // Block on stdin — will be killed by cancellation + let _ = std::io::stdin().read_line(&mut String::new()); + }); + let token = CancellationToken::new(); + let mut fspy_cmd = fspy::Command::from(cmd); + fspy_cmd.stdout(Stdio::piped()).stdin(Stdio::piped()); + let mut child = fspy_cmd.spawn(token.clone()).await?; + + // Wait for child to signal readiness + let mut stdout = child.stdout.take().unwrap(); + let mut buf = vec![0u8; 64]; + let n = stdout.read(&mut buf).await?; + assert!(std::str::from_utf8(&buf[..n])?.contains("ready")); + + // Cancel — fspy background task calls start_kill + token.cancel(); + let termination = child.wait_handle.await?; + assert!(!termination.status.success()); + Ok(()) +} diff --git a/crates/fspy/tests/node_fs.rs b/crates/fspy/tests/node_fs.rs index bbbb2bb5..6f2afb95 100644 --- a/crates/fspy/tests/node_fs.rs +++ b/crates/fspy/tests/node_fs.rs @@ -16,7 +16,7 @@ async fn track_node_script(script: &str, args: &[&OsStr]) -> anyhow::Result anyhow::Result) -> PathAccessIterable cmd.current_dir(cwd); } cmd.args(args); - let tracked_child = cmd.spawn().await.unwrap(); + let tracked_child = cmd.spawn(tokio_util::sync::CancellationToken::new()).await.unwrap(); let termination = tracked_child.wait_handle.await.unwrap(); assert!(termination.status.success()); diff --git a/crates/fspy/tests/test_utils/mod.rs b/crates/fspy/tests/test_utils/mod.rs index 20e7a2c4..cfa46c4a 100644 --- a/crates/fspy/tests/test_utils/mod.rs +++ b/crates/fspy/tests/test_utils/mod.rs @@ -78,7 +78,11 @@ macro_rules! track_fn { )] #[allow(dead_code, reason = "used by track_fn! macro; not all test files use this macro")] pub async fn spawn_command(cmd: subprocess_test::Command) -> anyhow::Result { - let termination = fspy::Command::from(cmd).spawn().await?.wait_handle.await?; + let termination = fspy::Command::from(cmd) + .spawn(tokio_util::sync::CancellationToken::new()) + .await? + .wait_handle + .await?; assert!(termination.status.success()); Ok(termination.path_accesses) } diff --git a/crates/fspy_e2e/Cargo.toml b/crates/fspy_e2e/Cargo.toml index db8833ee..90ece05e 100644 --- a/crates/fspy_e2e/Cargo.toml +++ b/crates/fspy_e2e/Cargo.toml @@ -9,6 +9,7 @@ fspy = { workspace = true } rustc-hash = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } toml = { workspace = true } [lints] diff --git a/crates/fspy_e2e/src/main.rs b/crates/fspy_e2e/src/main.rs index 8e9f1c59..9d6a3052 100644 --- a/crates/fspy_e2e/src/main.rs +++ b/crates/fspy_e2e/src/main.rs @@ -84,7 +84,8 @@ async fn main() { .stderr(Stdio::piped()) .current_dir(&dir); - let mut tracked_child = cmd.spawn().await.unwrap(); + let mut tracked_child = + cmd.spawn(tokio_util::sync::CancellationToken::new()).await.unwrap(); let mut stdout_bytes = Vec::::new(); tracked_child.stdout.take().unwrap().read_to_end(&mut stdout_bytes).await.unwrap(); diff --git a/crates/vite_task/Cargo.toml b/crates/vite_task/Cargo.toml index 6fedfaa3..6f6dc571 100644 --- a/crates/vite_task/Cargo.toml +++ b/crates/vite_task/Cargo.toml @@ -22,6 +22,7 @@ fspy = { workspace = true } futures-util = { workspace = true } once_cell = { workspace = true } owo-colors = { workspace = true } +petgraph = { workspace = true } pty_terminal_test_client = { workspace = true } rayon = { workspace = true } rusqlite = { workspace = true, features = ["bundled"] } @@ -30,6 +31,7 @@ serde = { workspace = true, features = ["derive", "rc"] } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "io-util", "macros", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true } twox-hash = { workspace = true } vite_path = { workspace = true } @@ -46,5 +48,8 @@ tempfile = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { workspace = true } +[target.'cfg(windows)'.dependencies] +winapi = { workspace = true, features = ["handleapi", "jobapi2", "winnt"] } + [lib] doctest = false diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index f520ed35..74c90580 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -2,13 +2,17 @@ pub mod fingerprint; pub mod glob_inputs; pub mod spawn; -use std::{collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; +use std::{cell::RefCell, collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; -use futures_util::FutureExt; +use futures_util::{FutureExt, StreamExt, future::LocalBoxFuture, stream::FuturesUnordered}; +use petgraph::Direction; +use rustc_hash::FxHashMap; +use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use vite_path::AbsolutePath; use vite_task_plan::{ ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand, - SpawnExecution, + SpawnExecution, execution_graph::ExecutionNodeIndex, }; use self::{ @@ -45,80 +49,147 @@ pub enum SpawnOutcome { Failed, } -/// Holds mutable references needed during graph execution. +/// Maximum number of tasks that can execute concurrently within a single +/// execution graph level. +const CONCURRENCY_LIMIT: usize = 10; + +/// Holds shared references needed during graph execution. +/// +/// The `reporter` field is wrapped in `RefCell` because concurrent futures +/// (via `FuturesUnordered`) need shared access to create leaf reporters. +/// Since all futures run on a single thread (no `tokio::spawn`), `RefCell` +/// is sufficient for interior mutability. /// -/// The `reporter` field is used to create leaf reporters for individual executions. /// Cache fields are passed through to [`execute_spawn`] for cache-aware execution. struct ExecutionContext<'a> { /// The graph-level reporter, used to create leaf reporters via `new_leaf_execution()`. - reporter: &'a mut dyn GraphExecutionReporter, + /// Wrapped in `RefCell` for shared access from concurrent task futures. + reporter: &'a RefCell>, /// The execution cache for looking up and storing cached results. cache: &'a ExecutionCache, /// Base path for resolving relative paths in cache entries. /// Typically the workspace root. cache_base_path: &'a Arc, + /// Token for cancelling in-flight child processes. + cancellation_token: CancellationToken, } impl ExecutionContext<'_> { - /// Execute all tasks in an execution graph in dependency order. + /// Execute all tasks in an execution graph concurrently, respecting dependencies. /// - /// `ExecutionGraph` guarantees acyclicity at construction time. - /// We compute a topological order and iterate in reverse to get execution order - /// (dependencies before dependents). + /// Uses a DAG scheduler: tasks whose dependencies have all completed are scheduled + /// onto a `FuturesUnordered`, bounded by a per-graph `Semaphore` with + /// [`CONCURRENCY_LIMIT`] permits. Each recursive `Expanded` graph creates its own + /// semaphore, so nested graphs have independent concurrency limits. /// - /// Fast-fail: if any task fails (non-zero exit or infrastructure error), remaining - /// tasks and `&&`-chained items are skipped. Leaf-level errors are reported through - /// the reporter. Cycle detection is handled at plan time. - /// - /// Returns `true` if all tasks succeeded, `false` if any task failed. + /// Fast-fail: if any task fails, `execute_leaf` cancels the `CancellationToken` + /// (killing in-flight child processes). This method detects the cancellation, + /// closes the semaphore, drains remaining futures, and returns. #[tracing::instrument(level = "debug", skip_all)] - async fn execute_expanded_graph(&mut self, graph: &ExecutionGraph) -> bool { - // `compute_topological_order()` returns nodes in topological order: for every - // edge A→B, A appears before B. Since our edges mean "A depends on B", - // dependencies (B) appear after their dependents (A). We iterate in reverse - // to get execution order where dependencies run first. - - // Execute tasks in dependency-first order. Each task may have multiple items - // (from `&&`-split commands), which are executed sequentially. - // If any task fails, subsequent tasks and items are skipped (fast-fail). - let topo_order = graph.compute_topological_order(); - for &node_ix in topo_order.iter().rev() { - let task_execution = &graph[node_ix]; - - for item in &task_execution.items { - let failed = match &item.kind { - ExecutionItemKind::Leaf(leaf_kind) => { - self.execute_leaf(&item.execution_item_display, leaf_kind) - .boxed_local() - .await - } - ExecutionItemKind::Expanded(nested_graph) => { - !self.execute_expanded_graph(nested_graph).boxed_local().await - } - }; - if failed { - return false; + async fn execute_expanded_graph(&self, graph: &ExecutionGraph) { + if graph.node_count() == 0 { + return; + } + + let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); + + // Compute dependency count for each node. + // Edge A→B means "A depends on B", so A's dependency count = outgoing edge count. + let mut dep_count: FxHashMap = FxHashMap::default(); + for node_ix in graph.node_indices() { + dep_count.insert(node_ix, graph.neighbors(node_ix).count()); + } + + let mut futures = FuturesUnordered::new(); + + // Schedule initially ready nodes (no dependencies). + for (&node_ix, &count) in &dep_count { + if count == 0 { + futures.push(self.spawn_node(graph, node_ix, &semaphore)); + } + } + + // Process completions and schedule newly ready dependents. + // On failure, `execute_leaf` cancels the token — we detect it here, close + // the semaphore (so pending acquires fail immediately), and drain. + while let Some(completed_ix) = futures.next().await { + if self.cancellation_token.is_cancelled() { + semaphore.close(); + while futures.next().await.is_some() {} + return; + } + + // Find dependents of the completed node (nodes that depend on it). + // Edge X→completed means "X depends on completed", so X is a predecessor + // in graph direction = neighbor in Incoming direction. + for dependent in graph.neighbors_directed(completed_ix, Direction::Incoming) { + let count = dep_count.get_mut(&dependent).expect("all nodes are in dep_count"); + *count -= 1; + if *count == 0 { + futures.push(self.spawn_node(graph, dependent, &semaphore)); + } + } + } + } + + /// Create a future that acquires a semaphore permit, then executes a graph node. + /// + /// On failure, `execute_node` cancels the `CancellationToken` — the caller + /// detects this after the future completes. On semaphore closure or prior + /// cancellation, the node is skipped. + fn spawn_node<'a>( + &'a self, + graph: &'a ExecutionGraph, + node_ix: ExecutionNodeIndex, + semaphore: &Arc, + ) -> LocalBoxFuture<'a, ExecutionNodeIndex> { + let sem = semaphore.clone(); + async move { + if let Ok(_permit) = sem.acquire_owned().await + && !self.cancellation_token.is_cancelled() + { + self.execute_node(graph, node_ix).await; + } + node_ix + } + .boxed_local() + } + + /// Execute a single node's items sequentially. + /// + /// A node may have multiple items (from `&&`-split commands). Items are executed + /// in order; if any item fails, `execute_leaf` cancels the `CancellationToken` + /// and remaining items are skipped (preserving `&&` semantics). + async fn execute_node(&self, graph: &ExecutionGraph, node_ix: ExecutionNodeIndex) { + let task_execution = &graph[node_ix]; + + for item in &task_execution.items { + if self.cancellation_token.is_cancelled() { + return; + } + match &item.kind { + ExecutionItemKind::Leaf(leaf_kind) => { + self.execute_leaf(&item.execution_item_display, leaf_kind).boxed_local().await; + } + ExecutionItemKind::Expanded(nested_graph) => { + self.execute_expanded_graph(nested_graph).boxed_local().await; } } } - true } /// Execute a single leaf item (in-process command or spawned process). /// /// Creates a [`LeafExecutionReporter`] from the graph reporter and delegates - /// to the appropriate execution method. - /// - /// Returns `true` if the execution failed (non-zero exit or infrastructure error). + /// to the appropriate execution method. On failure (non-zero exit or + /// infrastructure error), cancels the `CancellationToken`. #[tracing::instrument(level = "debug", skip_all)] - async fn execute_leaf( - &mut self, - display: &ExecutionItemDisplay, - leaf_kind: &LeafExecutionKind, - ) -> bool { - let mut leaf_reporter = self.reporter.new_leaf_execution(display, leaf_kind); - - match leaf_kind { + async fn execute_leaf(&self, display: &ExecutionItemDisplay, leaf_kind: &LeafExecutionKind) { + // Borrow the reporter briefly to create the leaf reporter, then drop + // the RefCell guard before any `.await` point. + let mut leaf_reporter = self.reporter.borrow_mut().new_leaf_execution(display, leaf_kind); + + let failed = match leaf_kind { LeafExecutionKind::InProcess(in_process_execution) => { // In-process (built-in) commands: caching is disabled, execute synchronously let mut stdio_config = leaf_reporter @@ -141,15 +212,23 @@ impl ExecutionContext<'_> { clippy::large_futures, reason = "spawn execution with cache management creates large futures" )] - let outcome = - execute_spawn(leaf_reporter, spawn_execution, self.cache, self.cache_base_path) - .await; + let outcome = execute_spawn( + leaf_reporter, + spawn_execution, + self.cache, + self.cache_base_path, + self.cancellation_token.clone(), + ) + .await; match outcome { SpawnOutcome::CacheHit => false, SpawnOutcome::Spawned(status) => !status.success(), SpawnOutcome::Failed => true, } } + }; + if failed { + self.cancellation_token.cancel(); } } } @@ -178,6 +257,7 @@ pub async fn execute_spawn( spawn_execution: &SpawnExecution, cache: &ExecutionCache, cache_base_path: &Arc, + cancellation_token: CancellationToken, ) -> SpawnOutcome { let cache_metadata = spawn_execution.cache_metadata.as_ref(); @@ -270,7 +350,7 @@ pub async fn execute_spawn( // while the child also writes to the same FD. drop(stdio_config); - match spawn_inherited(&spawn_execution.spawn_command).await { + match spawn_inherited(&spawn_execution.spawn_command, cancellation_token).await { Ok(result) => { leaf_reporter.finish( Some(result.exit_status), @@ -341,6 +421,7 @@ pub async fn execute_spawn( std_outputs.as_mut(), path_accesses.as_mut(), &resolved_negatives, + cancellation_token, ) .await { @@ -438,7 +519,10 @@ pub async fn execute_spawn( /// The child process will see `is_terminal() == true` for stdout/stderr when the /// parent is running in a terminal. This is expected behavior. #[tracing::instrument(level = "debug", skip_all)] -async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result { +async fn spawn_inherited( + spawn_command: &SpawnCommand, + cancellation_token: CancellationToken, +) -> anyhow::Result { let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); cmd.envs(spawn_command.all_envs.iter()); @@ -480,11 +564,124 @@ async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; Ok(SpawnResult { exit_status, duration: start.elapsed() }) } +/// Win32 Job Object utilities for process tree management. +/// +/// On Windows, `TerminateProcess` only kills the direct child process, not its +/// descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, +/// which automatically terminates all processes in the job when the handle is dropped. +#[cfg(windows)] +mod win_job { + use std::{io, os::windows::io::RawHandle}; + + use winapi::{ + shared::minwindef::FALSE, + um::{ + handleapi::CloseHandle, + jobapi2::{ + AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, + TerminateJobObject, + }, + winnt::{ + HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, + }, + }, + }; + + /// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. + pub(super) struct OwnedJobHandle(HANDLE); + + impl OwnedJobHandle { + /// Immediately terminate all processes in the job. + /// + /// This is needed when pipes to a grandchild process must be closed before + /// the job handle is dropped (e.g., to unblock pipe reads in `spawn_with_tracking`). + pub(super) fn terminate(&self) { + // SAFETY: self.0 is a valid job handle from CreateJobObjectW. + unsafe { TerminateJobObject(self.0, 1) }; + } + } + + impl Drop for OwnedJobHandle { + fn drop(&mut self) { + // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. + unsafe { CloseHandle(self.0) }; + } + } + + /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. + /// + /// Returns the job handle wrapped in an RAII guard. When dropped, all processes + /// in the job (the child and its descendants) are terminated. + pub(super) fn assign_to_kill_on_close_job( + process_handle: RawHandle, + ) -> io::Result { + // SAFETY: Creating an anonymous job object with no security attributes. + let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; + if job.is_null() { + return Err(io::Error::last_os_error()); + } + let job = OwnedJobHandle(job); + + // Configure the job to kill all processes when the handle is closed. + // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers + // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. + let mut info = unsafe { + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); + info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + info + }; + + // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. + let ok = unsafe { + SetInformationJobObject( + job.0, + // JobObjectExtendedLimitInformation = 9 + 9, + std::ptr::from_mut(&mut info).cast(), + std::mem::size_of::().try_into().unwrap(), + ) + }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle + // from the caller. + let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + Ok(job) + } +} + impl Session<'_> { /// Execute an execution graph, reporting events through the provided reporter builder. /// @@ -512,12 +709,13 @@ impl Session<'_> { } }; - let mut reporter = builder.build(); + let reporter = RefCell::new(builder.build()); - let mut execution_context = ExecutionContext { - reporter: &mut *reporter, + let execution_context = ExecutionContext { + reporter: &reporter, cache, cache_base_path: &self.workspace_path, + cancellation_token: CancellationToken::new(), }; // Execute the graph with fast-fail: if any task fails, remaining tasks @@ -526,6 +724,6 @@ impl Session<'_> { // Leaf-level errors and non-zero exit statuses are tracked internally // by the reporter. - reporter.finish() + reporter.into_inner().finish() } } diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 99ebae87..03a055bb 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -12,6 +12,7 @@ use fspy::AccessMode; use rustc_hash::FxHashSet; use serde::Serialize; use tokio::io::AsyncReadExt as _; +use tokio_util::sync::CancellationToken; use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::SpawnCommand; use wax::Program as _; @@ -56,6 +57,15 @@ pub struct TrackedPathAccesses { pub path_writes: FxHashSet, } +/// How the child process is awaited after stdout/stderr are drained. +enum ChildWait { + /// fspy tracking enabled — fspy manages cancellation internally. + Fspy(fspy::TrackedChild), + + /// Plain tokio process — cancellation is handled in the pipe read loop. + Tokio(tokio::process::Child), +} + /// Spawn a command with optional file system tracking via fspy, using piped stdio. /// /// Returns the execution result including exit status and duration. @@ -70,6 +80,10 @@ pub struct TrackedPathAccesses { clippy::too_many_lines, reason = "spawn logic is inherently sequential and splitting would reduce clarity" )] +#[expect( + clippy::too_many_arguments, + reason = "spawn parameters are all distinct concerns that don't form a natural group" +)] pub async fn spawn_with_tracking( spawn_command: &SpawnCommand, workspace_root: &AbsolutePath, @@ -78,38 +92,46 @@ pub async fn spawn_with_tracking( std_outputs: Option<&mut Vec>, path_accesses: Option<&mut TrackedPathAccesses>, resolved_negatives: &[wax::Glob<'static>], + cancellation_token: CancellationToken, ) -> anyhow::Result { - /// The tracking state of the spawned process. - /// Determined by whether `path_accesses` is `Some` (fspy enabled) or `None` (fspy disabled). - enum TrackingState { - /// fspy tracking is enabled - FspyEnabled(fspy::TrackedChild), - - /// fspy tracking is disabled, using plain tokio process - FspyDisabled(tokio::process::Child), - } - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); cmd.envs(spawn_command.all_envs.iter()); cmd.current_dir(&*spawn_command.cwd); cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); - let mut tracking_state = if path_accesses.is_some() { - // path_accesses is Some, spawn with fspy tracking enabled - TrackingState::FspyEnabled(cmd.spawn().await?) - } else { - // path_accesses is None, spawn without fspy - TrackingState::FspyDisabled(cmd.into_tokio_command().spawn()?) - }; + // On Windows, assign the child to a Job Object so that killing the child also + // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). + #[cfg(windows)] + let job; - let mut child_stdout = match &mut tracking_state { - TrackingState::FspyEnabled(tracked_child) => tracked_child.stdout.take().unwrap(), - TrackingState::FspyDisabled(tokio_child) => tokio_child.stdout.take().unwrap(), - }; - let mut child_stderr = match &mut tracking_state { - TrackingState::FspyEnabled(tracked_child) => tracked_child.stderr.take().unwrap(), - TrackingState::FspyDisabled(tokio_child) => tokio_child.stderr.take().unwrap(), + let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() { + // fspy tracking enabled — fspy manages cancellation internally via a clone + // of the token. We keep the original for the pipe read loop. + let mut tracked_child = cmd.spawn(cancellation_token.clone()).await?; + let stdout = tracked_child.stdout.take().unwrap(); + let stderr = tracked_child.stderr.take().unwrap(); + #[cfg(windows)] + { + use std::os::windows::io::AsRawHandle; + job = super::win_job::assign_to_kill_on_close_job( + tracked_child.process_handle.as_raw_handle(), + )?; + } + (stdout, stderr, ChildWait::Fspy(tracked_child)) + } else { + let mut child = cmd.into_tokio_command().spawn()?; + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + #[cfg(windows)] + { + use std::os::windows::io::{AsRawHandle, BorrowedHandle}; + // SAFETY: The child was just spawned, so its raw handle is valid. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + let owned = borrowed.try_clone_to_owned()?; + job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; + } + (stdout, stderr, ChildWait::Tokio(child)) }; // Output capturing is independent of fspy tracking @@ -122,7 +144,12 @@ pub async fn spawn_with_tracking( let start = Instant::now(); // Read from both stdout and stderr concurrently using select! + // Cancellation is handled directly in the loop: kill the child process (and + // on Windows, terminate the Job Object to kill grandchildren holding pipes). loop { + if stdout_done && stderr_done { + break; + } tokio::select! { result = child_stdout.read(&mut stdout_buf), if !stdout_done => { match result? { @@ -166,86 +193,110 @@ pub async fn spawn_with_tracking( } } } - else => break, + () = cancellation_token.cancelled() => { + // Kill the direct child (no-op for fspy which handles it internally). + if let ChildWait::Tokio(ref mut child) = child_wait { + let _ = child.start_kill(); + } + // On Windows, terminate the entire process tree so grandchild + // processes release their pipe handles. + #[cfg(windows)] + job.terminate(); + break; + } } } - // Wait for process termination and process path accesses if fspy was enabled - let (termination, path_accesses) = match tracking_state { - TrackingState::FspyEnabled(tracked_child) => { + // Wait for process termination and collect results. + // The child may have closed its pipes without exiting (e.g., daemonized), + // so we still need a cancellation arm here. + match child_wait { + ChildWait::Fspy(tracked_child) => { + // fspy's wait_handle already monitors the cancellation token internally, + // so no additional select! is needed here. let termination = tracked_child.wait_handle.await?; + let duration = start.elapsed(); + // path_accesses must be Some when fspy is enabled (they're set together) let path_accesses = path_accesses.ok_or_else(|| { anyhow::anyhow!("internal error: fspy enabled but path_accesses is None") })?; - (termination, path_accesses) - } - TrackingState::FspyDisabled(mut tokio_child) => { - let exit_status = tokio_child.wait().await?; - return Ok(SpawnResult { exit_status, duration: start.elapsed() }); - } - }; - let duration = start.elapsed(); - let path_reads = &mut path_accesses.path_reads; - let path_writes = &mut path_accesses.path_writes; - - for access in termination.path_accesses.iter() { - // Strip workspace root, clean `..` components, and filter in one pass. - // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. - let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { - let Ok(stripped_path) = strip_result else { - return None; - }; - // On Windows, paths are possible to be still absolute after stripping the workspace root. - // For example: c:\workspace\subdir\c:\workspace\subdir - // Just ignore those accesses. - let relative = RelativePathBuf::new(stripped_path).ok()?; - - // Clean `..` components — fspy may report paths like - // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for - // consistent behavior across platforms and clean user-facing messages. - let relative = relative.clean(); - - // Skip .git directory accesses (workaround for tools like oxlint) - if relative.as_path().strip_prefix(".git").is_ok() { - return None; - } + let path_reads = &mut path_accesses.path_reads; + let path_writes = &mut path_accesses.path_writes; - if !resolved_negatives.is_empty() - && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) - { - return None; - } + for access in termination.path_accesses.iter() { + // Strip workspace root, clean `..` components, and filter in one pass. + // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. + let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { + let Ok(stripped_path) = strip_result else { + return None; + }; + // On Windows, paths are possible to be still absolute after stripping the workspace root. + // For example: c:\workspace\subdir\c:\workspace\subdir + // Just ignore those accesses. + let relative = RelativePathBuf::new(stripped_path).ok()?; - Some(relative) - }); + // Clean `..` components — fspy may report paths like + // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for + // consistent behavior across platforms and clean user-facing messages. + let relative = relative.clean(); - let Some(relative_path) = relative_path else { - continue; - }; + // Skip .git directory accesses (workaround for tools like oxlint) + if relative.as_path().strip_prefix(".git").is_ok() { + return None; + } - if access.mode.contains(AccessMode::READ) { - path_reads.entry(relative_path.clone()).or_insert(PathRead { read_dir_entries: false }); - } - if access.mode.contains(AccessMode::WRITE) { - path_writes.insert(relative_path.clone()); - } - if access.mode.contains(AccessMode::READ_DIR) { - match path_reads.entry(relative_path) { - Entry::Occupied(mut occupied) => occupied.get_mut().read_dir_entries = true, - Entry::Vacant(vacant) => { - vacant.insert(PathRead { read_dir_entries: true }); + if !resolved_negatives.is_empty() + && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) + { + return None; + } + + Some(relative) + }); + + let Some(relative_path) = relative_path else { + continue; + }; + + if access.mode.contains(AccessMode::READ) { + path_reads + .entry(relative_path.clone()) + .or_insert(PathRead { read_dir_entries: false }); + } + if access.mode.contains(AccessMode::WRITE) { + path_writes.insert(relative_path.clone()); + } + if access.mode.contains(AccessMode::READ_DIR) { + match path_reads.entry(relative_path) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().read_dir_entries = true; + } + Entry::Vacant(vacant) => { + vacant.insert(PathRead { read_dir_entries: true }); + } + } } } - } - } - tracing::debug!( - "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", - path_reads.len(), - path_writes.len(), - termination.status, - ); + tracing::debug!( + "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", + path_reads.len(), + path_writes.len(), + termination.status, + ); - Ok(SpawnResult { exit_status: termination.status, duration }) + Ok(SpawnResult { exit_status: termination.status, duration }) + } + ChildWait::Tokio(mut child) => { + let exit_status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; + Ok(SpawnResult { exit_status, duration: start.elapsed() }) + } + } } diff --git a/crates/vite_task/src/session/mod.rs b/crates/vite_task/src/session/mod.rs index 1c0b58a7..c378794f 100644 --- a/crates/vite_task/src/session/mod.rs +++ b/crates/vite_task/src/session/mod.rs @@ -630,6 +630,7 @@ impl<'a> Session<'a> { &spawn_execution, cache, &self.workspace_path, + tokio_util::sync::CancellationToken::new(), ) .await; match outcome { diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json new file mode 100644 index 00000000..e6a3b536 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json @@ -0,0 +1,4 @@ +{ + "name": "concurrent-execution-test", + "private": true +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json new file mode 100644 index 00000000..6a46b3fa --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json @@ -0,0 +1,8 @@ +{ + "name": "@concurrent/a", + "scripts": { + "build": "barrier ../../.barrier sync 2", + "test": "barrier ../../.barrier test-sync 2 --exit=1", + "daemon": "barrier ../../.barrier daemon-sync 2 --exit=1" + } +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json new file mode 100644 index 00000000..40e49f3c --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json @@ -0,0 +1,8 @@ +{ + "name": "@concurrent/b", + "scripts": { + "build": "barrier ../../.barrier sync 2", + "test": "barrier ../../.barrier test-sync 2 --hang", + "daemon": "barrier ../../.barrier daemon-sync 2 --daemonize" + } +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml new file mode 100644 index 00000000..924b55f4 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml @@ -0,0 +1,2 @@ +packages: + - packages/* diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml new file mode 100644 index 00000000..b9064498 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml @@ -0,0 +1,29 @@ +# Tests that independent tasks execute concurrently. +# Packages a and b have no dependency relationship. +# Both use a barrier that requires 2 participants — if run sequentially, +# the first would wait forever and the test would timeout. + +[[e2e]] +name = "independent tasks run concurrently" +steps = ["vt run -r build"] + +# Both tasks use a single-command barrier (no && splitting). Task a exits with +# code 1 after the barrier, task b hangs after the barrier. Since both +# participate in the same barrier, b is guaranteed to be running when a fails. +# The test completing without timeout proves cancellation kills b. +[[e2e]] +name = "failure kills concurrent tasks" +steps = ["vt run -r test"] + +# Same as above but with --cache to exercise the piped stdio / fspy path +# (spawn_with_tracking) instead of the inherited stdio path (spawn_inherited). +[[e2e]] +name = "failure kills concurrent cached tasks" +steps = ["vt run -r --cache test"] + +# Task b closes stdout/stderr after the barrier but stays alive (daemonizes). +# The pipe reads EOF but the process doesn't exit. The runner must still be +# able to kill it via the cancellation token + Job Object. +[[e2e]] +name = "failure kills daemonized concurrent tasks" +steps = ["vt run -r --cache daemon"] diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap new file mode 100644 index 00000000..71c7dc36 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r --cache test +~/packages/a$ barrier ../../.barrier test-sync 2 --exit=1 +~/packages/b$ barrier ../../.barrier test-sync 2 --hang + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap new file mode 100644 index 00000000..6fd09500 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r test +~/packages/a$ barrier ../../.barrier test-sync 2 --exit=1 ⊘ cache disabled +~/packages/b$ barrier ../../.barrier test-sync 2 --hang ⊘ cache disabled + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap new file mode 100644 index 00000000..4713ed55 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r --cache daemon +~/packages/a$ barrier ../../.barrier daemon-sync 2 --exit=1 +~/packages/b$ barrier ../../.barrier daemon-sync 2 --daemonize + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap new file mode 100644 index 00000000..c8b10252 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +> vt run -r build +~/packages/a$ barrier ../../.barrier sync 2 ⊘ cache disabled +~/packages/b$ barrier ../../.barrier sync 2 ⊘ cache disabled + + +--- +vt run: 0/2 cache hit (0%). (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json new file mode 100644 index 00000000..b39113d0 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json @@ -0,0 +1,3 @@ +{ + "cache": false +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json index 3850a3ac..21dde7e8 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json @@ -1,4 +1,7 @@ { "name": "grouped-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json index 88a7b0e0..72ebe436 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json @@ -4,6 +4,6 @@ "task": "replace-file-content src/data.txt original modified" }, "dependencies": { - "@test/normal-pkg": "workspace:*" + "@test/touch-pkg": "workspace:*" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json index a34c6a22..ebf215e5 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json @@ -2,5 +2,8 @@ "name": "@test/touch-pkg", "scripts": { "task": "touch-file src/data.txt" + }, + "dependencies": { + "@test/normal-pkg": "workspace:*" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json index 59356dd6..85362482 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json @@ -1,4 +1,7 @@ { "name": "interleaved-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json index ee07db4a..8d842493 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json @@ -1,4 +1,7 @@ { "name": "labeled-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/packages/tools/package.json b/packages/tools/package.json index 90810f51..e1e6acc3 100644 --- a/packages/tools/package.json +++ b/packages/tools/package.json @@ -2,6 +2,7 @@ "name": "vite-task-tools", "private": true, "bin": { + "barrier": "./src/barrier.js", "check-tty": "./src/check-tty.js", "json-edit": "./src/json-edit.ts", "print": "./src/print.ts", diff --git a/packages/tools/src/barrier.js b/packages/tools/src/barrier.js new file mode 100755 index 00000000..87f06aa1 --- /dev/null +++ b/packages/tools/src/barrier.js @@ -0,0 +1,81 @@ +#!/usr/bin/env node + +// barrier [--exit=] [--hang] [--daemonize] +// +// Cross-platform concurrency barrier for testing. +// Creates /_, then waits (via fs.watch) for files +// matching _* to exist in . +// +// Options: +// --exit= Exit with the given code after the barrier is met. +// --hang Keep process alive after the barrier (for kill tests). +// --daemonize Close stdout/stderr but keep process alive (for daemon kill tests). +// +// If tasks run concurrently, all participants arrive and the barrier resolves. +// If tasks run sequentially, the first participant waits forever (test timeout). + +import fs from 'node:fs'; +import path from 'node:path'; + +const positional = []; +let exitCode = 0; +let hang = false; +let daemonize = false; + +for (const arg of process.argv.slice(2)) { + if (arg.startsWith('--exit=')) { + exitCode = parseInt(arg.slice(7), 10); + } else if (arg === '--hang') { + hang = true; + } else if (arg === '--daemonize') { + daemonize = true; + } else { + positional.push(arg); + } +} + +const [dir, prefix, countStr] = positional; +const count = parseInt(countStr, 10); + +fs.mkdirSync(dir, { recursive: true }); + +// Create this participant's marker file. +const markerName = `${prefix}_${process.pid}`; +fs.writeFileSync(path.join(dir, markerName), ''); + +function countMatches() { + return fs.readdirSync(dir).filter((f) => f.startsWith(`${prefix}_`)).length; +} + +function onBarrierMet() { + if (daemonize) { + // Close stdout/stderr but keep the process alive. Simulates a daemon that + // detaches from stdio — tests that the runner can still kill such processes. + process.stdout.end(); + process.stderr.end(); + setInterval(() => {}, 1 << 30); + return; + } + if (hang) { + // Keep the process alive indefinitely — killed via signal when the runner cancels. + // Use setInterval rather than stdin.resume() for cross-platform reliability. + setInterval(() => {}, 1 << 30); + return; + } + process.exit(exitCode); +} + +// Start watching before the initial check to avoid missing events +// between the check and the watch setup. +const watcher = fs.watch(dir, () => { + if (countMatches() >= count) { + watcher.close(); + onBarrierMet(); + } +}); + +// Check immediately in case all participants already arrived. +if (countMatches() >= count) { + watcher.close(); + onBarrierMet(); +}