feat: concurrent task execution with DAG scheduler#288
Draft
branchseer wants to merge 11 commits intomainfrom
Draft
feat: concurrent task execution with DAG scheduler#288branchseer wants to merge 11 commits intomainfrom
branchseer wants to merge 11 commits intomainfrom
Conversation
…n_with_tracking` Accept a `tokio_util::sync::CancellationToken` in the spawn pipeline so in-flight child processes can be killed when cancellation is signalled. For fspy-tracked processes, the token is passed into fspy's background task which selects between `child.wait()` and `token.cancelled()`. For plain tokio processes, `spawn_with_tracking` spawns its own background task with the same pattern. Both kill the child via `Child::start_kill()` then await normal exit — no PID-based killing. The read loop needs no cancellation branch: killing the child closes its pipes and reads return EOF naturally. `spawn_inherited` uses the select pattern directly since it has no read loop. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace the sequential execution loop with a concurrent DAG scheduler. Independent tasks now run in parallel, bounded by a per-graph semaphore (limit 10). Failure cancels all in-flight tasks via CancellationToken. - Use FuturesUnordered + tokio::sync::Semaphore for bounded concurrency - Each nested ExecutionGraph (Expanded items) gets its own semaphore - Wrap reporter in RefCell for shared access from concurrent futures - On failure, close semaphore so pending acquires fail immediately - Add `barrier` test tool (fs.watch-based cross-platform barrier) - Add e2e tests proving concurrency (barrier) and kill-on-failure - Stabilize existing e2e tests by adding deps between independent packages Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
- Collapse nested if-let + if in spawn_node (clippy::collapsible_if) - Use setInterval instead of stdin.resume() in barrier --hang for cross-platform reliability (stdin.resume may not keep process alive on Windows in PTY environments) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
On Windows, TerminateProcess only kills the direct child, leaving grandchildren (e.g., node.exe spawned by a .cmd shim) alive. This caused the "failure kills concurrent tasks" e2e test to timeout. Add a Win32 Job Object with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE to spawn_inherited. The child process and all its descendants are assigned to the job; when the handle drops, the entire tree is killed. This makes the kill-on-failure test cross-platform (no platform split). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove unused DWORD import - Use pub(super) visibility for OwnedHandle and assign_child_to_kill_on_close_job Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use the child's existing process handle directly via AsRawHandle instead of re-opening it by PID. Simpler, removes the OpenProcess call and PROCESS_SET_QUOTA/PROCESS_TERMINATE permissions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Apply assign_to_kill_on_close_job to spawn_with_tracking (piped/fspy path) in addition to spawn_inherited, covering all spawn modes on Windows - Expose duplicated process_handle on fspy::TrackedChild (Windows) so callers can assign it to a Job Object without modifying fspy internals - Use DuplicateHandle (via try_clone_to_owned) so the handle stays valid after tokio closes its copy when the process exits - Add "failure kills concurrent cached tasks" e2e test exercising the --cache (piped stdio / fspy) path Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
On Windows with piped stdio, killing the direct child (cmd.exe) doesn't close pipes held by grandchild processes (node.exe). The pipe read loop in spawn_with_tracking blocks forever waiting for EOF. Fix: add a cancellation branch to the pipe read loop that calls TerminateJobObject to kill the entire process tree, closing all pipes. Also add TerminateJobObject import and terminate() method on OwnedJobHandle. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The `else => break` in tokio::select! only fires when ALL other arms are disabled. The `cancelled()` arm stays pending (not disabled) even when both pipes have EOF'd, preventing `else` from ever triggering. This caused every piped-stdio task to hang on Windows. Fix: check the exit condition (both pipes done) at the top of the loop instead of relying on the `else` arm. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the background tokio::spawn for the non-fspy path. Instead, handle cancellation directly in the pipe read loop alongside pipe reads. This eliminates: - The WaitState enum and background task indirection - The cancellation_for_pipes token clone - The need for Send on the Job Object handle The pipe read loop now has a unified cancellation arm (all platforms) that kills the direct child and terminates the Job Object on Windows. The exit condition is checked at the top of the loop to avoid the tokio::select! else-arm issue with the always-pending cancelled() future. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add cancellation-aware wait after pipe reads in spawn_with_tracking. If a child closes stdout/stderr but stays alive (e.g., daemonizes), the pipe reads EOF but child.wait() would block forever without cancellation support. Add --daemonize flag to barrier test tool and e2e test verifying that daemonized concurrent tasks are properly killed on failure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
FuturesUnordered+tokio::sync::Semaphore(limit 10 per graph level)Expandedgraph gets its own semaphore for independent concurrency limitsCancellationToken— any task failure immediately kills all in-flight processesbarriertest tool (cross-platform,fs.watch-based) for concurrency testingTest plan
concurrent-executionfixture proves independent tasks run concurrently (barrier with 2 participants — would timeout if sequential)cargo clippycleancargo test -p vite_taskunit tests pass🤖 Generated with Claude Code