bench: demonstrate tokio-uring thread-per-core execution in dfbench#21717
bench: demonstrate tokio-uring thread-per-core execution in dfbench#21717Dandandan wants to merge 4 commits intoapache:mainfrom
Conversation
Adds a demo integration of `tokio-uring` into `dfbench`:
* `util::tokio_uring_pool` — spawns N OS threads, each running its own
`tokio_uring::start` runtime (a current-thread Tokio reactor driven
by `io_uring`). `pool.spawn(|| async { ... })` ships a Send closure
to a round-robin worker, where it builds a possibly-`!Send` future
that runs locally on that worker's ring.
* `util::tokio_uring_store::TokioUringObjectStore` — local
`ObjectStore` whose `get_opts` and `get_ranges` drive reads through
`tokio_uring::fs::File::read_at`, dispatched across the pool so
every read lands on the next worker's ring. Writes, list, copy, and
delete delegate to `LocalFileSystem`.
* ClickBench uses the pool by default on Linux (one worker per CPU).
Each output partition of the physical plan is `pool.spawn`-ed, so
plan execution itself happens on the io_uring runtimes. Top-level
`CoalescePartitionsExec` / `SortPreservingMergeExec` are stripped
so their N-partition children fan out across workers; for SPM the
merge is rebuilt over a `MemorySourceConfig` and re-executed on one
worker.
Enabled via `--tokio-uring-workers N` (default: `available_parallelism()`
on Linux, disabled elsewhere; `0` forces the legacy tokio MT path).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the ObjectStore is called from inside a pool worker (the common case — plan execution is already dispatched onto the pool), hop-free local dispatch is strictly better than the round-robin `pool.spawn`: bytes land on the same core that will consume them, and we skip the mpsc + oneshot round-trip. Adds an `IN_WORKER` thread-local set by `run_worker`, and a fast path in `TokioUringObjectStore::read_ranges_uring` that uses `tokio_uring::spawn` on the current ring when `in_worker()` is true. The round-robin path is kept for callers on a non-uring runtime (e.g. during planning on the main tokio MT). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`RepartitionExec` spawns its input-fetcher tasks with `tokio::spawn` on the current runtime the first time any output partition is polled. On a pool of thread-per-core `tokio-uring` reactors, those fetchers pile onto whichever worker won the race, serializing Parquet I/O + decode on one ring. Add a transparent `PoolDispatchExec` wrapper that redirects `execute(partition, …)` onto a pool worker via a channel-backed bridge stream. `wrap_leaves_with_pool_dispatch` walks the plan bottom-up and inserts it around every leaf (typically the `DataSourceExec` scan), so per-partition scans fan out across the pool regardless of where `RepartitionExec`'s fetcher task runs. The wrapper is transparent to schema, partitioning, ordering, and statistics — it only redirects where `execute` runs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (31e2b2f) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (31e2b2f) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (31e2b2f) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Wrapping only leaves left `Agg(Partial)` running in RepartitionExec's input-fetcher task — which on a thread-per-core pool spawns all N fetchers onto whichever worker first polled the repartition, serializing the partial-aggregation CPU on one core. Wrap each RepartitionExec's child instead so every input pipeline dispatches to a round-robin pool worker and runs in parallel. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (3ee889f) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (3ee889f) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing tokio-uring-benchmark-demo (3ee889f) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Summary
Demo integration of
tokio-uringintodfbenchshowing a thread-per-core +io_uringexecution model for DataFusion.util::tokio_uring_pool::TokioUringPool— spawns N OS threads, each hosting its owntokio_uring::startruntime (a current-thread Tokio reactor driven byio_uring).pool.spawn(|| async { ... })ships aSendclosure to a round-robin worker which constructs the (possibly-!Send) future locally, so tokio-uring's!Sendfs::Filecan live inside it. A thread-localIN_WORKERflag lets callers detect when they are already on a pool reactor.util::tokio_uring_store::TokioUringObjectStore— a localObjectStorethat servicesget_opts/get_rangesviatokio_uring::fs::File::read_at. If the caller is already on a pool worker (the common case), the read stays local viatokio_uring::spawnon the current ring — no cross-thread hop. Otherwise it falls back to round-robinpool.spawn.util::tokio_uring_dispatch::PoolDispatchExec— transparentExecutionPlanwrapper.wrap_leaves_with_pool_dispatchwalks the plan bottom-up and inserts it around every leaf (typically theDataSourceExecscan) soRepartitionExec's lazily-spawned input fetchers pull from streams running on different pool workers. Without this, allMfetchers pile onto the worker that won the lazy-spawn race, serializing I/O + decode.pool.spawn-ed, so plan execution itself happens on the io_uring runtimes. Top-levelCoalescePartitionsExec/SortPreservingMergeExecare stripped so their N-partition children fan out across workers; for SPM the merge is rebuilt over aMemorySourceConfigand re-executed on one worker.Enabled via
--tokio-uring-workers(default:std::thread::available_parallelism()on Linux, disabled elsewhere;0forces the legacy tokio MT path).Design: thread-per-core with tokio-uring
pool.spawn-ed; DataFusion's SendableRecordBatchStream is polled to completion on one worker.PoolDispatchExecredirects eachDataSourceExec.execute(partition, ...)onto a pool worker, so Parquet reads + decode parallelize even when the caller is aRepartitionExecfetcher concentrated on one worker.TokioUringObjectStorestays local to the caller's ring when already on a pool worker — no mpsc+oneshot round-trip for everyget_ranges.!Send-safe.pool.spawntakes a builder closure that runs on the target worker, sotokio_uring::fs::Filenever crosses threads.Known limitations
RepartitionExec's hash/round-robin shuffling still runs on whichever worker first polled it.PoolDispatchExecdistributes the scan CPU, not the shuffle. A cleaner fix would need DataFusion to expose a spawner hook.JoinExecvariants, etc.) behave identically to the MT path but are driven by the pool instead.LocalFileSystem— the demo focuses on reads.Test plan
cargo checkon macOS (non-Linux path, LocalFS fallback).cargo zigbuild --target x86_64-unknown-linux-gnu --no-default-features --lib --tests(Linuxtokio-uringpath).cargo clippy -p datafusion-benchmarks --all-targets -- -D warnings../target/release/dfbench clickbench -p /path/to/hits.parqueton Linux and compare wall time against--tokio-uring-workers 0.perf stat -e io_uring:*to confirm ring submissions during a ClickBench run.🤖 Generated with Claude Code