diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..9a541f6383162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -443,7 +443,7 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c30a1365d7a7dc50cc847e54154e6af49e4c4b0fddc9f607b687f29212082743" dependencies = [ - "bitflags", + "bitflags 2.11.0", "serde", "serde_core", "serde_json", @@ -1028,6 +1028,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.0" @@ -1083,7 +1089,7 @@ checksum = "ee04c4c84f1f811b017f2fbb7dd8815c976e7ca98593de9c1e2afad0f636bff4" dependencies = [ "async-stream", "base64 0.22.1", - "bitflags", + "bitflags 2.11.0", "bollard-buildkit-proto", "bollard-stubs", "bytes", @@ -1824,6 +1830,7 @@ dependencies = [ "serde_json", "snmalloc-rs", "tokio", + "tokio-uring", "tokio-util", ] @@ -3019,7 +3026,7 @@ version = "25.12.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" dependencies = [ - "bitflags", + "bitflags 2.11.0", "rustc_version", ] @@ -3538,7 +3545,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -3766,6 +3773,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -3992,7 +4009,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ - "bitflags", + "bitflags 2.11.0", "libc", "plain", "redox_syscall 0.7.3", @@ -4147,7 +4164,7 @@ version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "cfg_aliases", "libc", @@ -4266,7 +4283,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -4887,7 +4904,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.3", "thiserror", "tokio", "tracing", @@ -4924,7 +4941,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] @@ -5092,7 +5109,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -5101,7 +5118,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -5312,7 +5329,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.11.0", "errno", "libc", "linux-raw-sys", @@ -5381,7 +5398,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a990b25f351b25139ddc7f21ee3f6f56f86d6846b74ac8fad3a719a287cd4a0" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "clipboard-win", "home", @@ -5480,7 +5497,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags", + "bitflags 2.11.0", "core-foundation", "core-foundation-sys", "libc", @@ -5765,6 +5782,16 @@ dependencies = [ "cmake", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.6.3" @@ -6189,7 +6216,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -6225,7 +6252,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.10.1", - "socket2", + "socket2 0.6.3", "tokio", "tokio-util", "whoami", @@ -6253,6 +6280,20 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-uring" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" +dependencies = [ + "futures-util", + "io-uring", + "libc", + "slab", + "socket2 0.4.10", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -6315,7 +6356,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2", + "socket2 0.6.3", "sync_wrapper", "tokio", "tokio-stream", @@ -6361,7 +6402,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags", + "bitflags 2.11.0", "bytes", "futures-util", "http 1.4.0", @@ -6892,7 +6933,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hashbrown 0.15.5", "indexmap 2.14.0", "semver", @@ -7304,7 +7345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.0", "indexmap 2.14.0", "log", "serde", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index f82f1c0a03e3d..1c67146091586 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -60,5 +60,8 @@ snmalloc-rs = { version = "0.7", optional = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tokio-util = { version = "0.7.17" } +[target.'cfg(target_os = "linux")'.dependencies] +tokio-uring = "0.5" + [dev-dependencies] datafusion-proto = { workspace = true } diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 70aaeb7d2d192..050fb8a7e7230 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -18,9 +18,13 @@ use std::fs; use std::io::ErrorKind; use std::path::{Path, PathBuf}; +#[cfg(target_os = "linux")] +use std::sync::Arc; use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats}; use clap::Args; +#[cfg(target_os = "linux")] +use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::logical_expr::{ExplainFormat, ExplainOption}; use datafusion::{ error::{DataFusionError, Result}, @@ -28,6 +32,8 @@ use datafusion::{ }; use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; +#[cfg(target_os = "linux")] +use futures::StreamExt; /// SQL to create the hits view with proper EventDate casting. /// @@ -208,6 +214,25 @@ impl RunOpt { } let rt = self.common.build_runtime()?; + + // If `--tokio-uring-workers` was set, spawn the pool and + // register a TokioUringObjectStore for file:// so every read + // travels through one of the pool's io_uring runtimes. + #[cfg(target_os = "linux")] + let uring_pool = { + let pool = self.common.tokio_uring_pool()?; + if let Some(pool) = pool.as_ref() { + use crate::util::tokio_uring_store::TokioUringObjectStore; + let store: Arc = + Arc::new(TokioUringObjectStore::new(Arc::clone(pool))); + let url = ObjectStoreUrl::parse("file:///")?; + rt.register_object_store(url.as_ref(), store); + } + pool + }; + #[cfg(not(target_os = "linux"))] + let uring_pool: Option<()> = self.common.tokio_uring_pool()?; + let ctx = SessionContext::new_with_config_rt(config, rt); self.register_hits(&ctx).await?; @@ -225,7 +250,17 @@ impl RunOpt { break; }; benchmark_run.start_new_case(&format!("Query {query_id}")); - let query_run = self.benchmark_query(&sql, query_id, &ctx).await; + #[cfg(target_os = "linux")] + let query_run = if let Some(pool) = uring_pool.as_ref() { + self.benchmark_query_uring(&sql, query_id, &ctx, pool).await + } else { + self.benchmark_query(&sql, query_id, &ctx).await + }; + #[cfg(not(target_os = "linux"))] + let query_run = { + let _ = &uring_pool; + self.benchmark_query(&sql, query_id, &ctx).await + }; match query_run { Ok(query_results) => { for iter in query_results { @@ -243,6 +278,156 @@ impl RunOpt { Ok(()) } + /// Execute the query with each fan-out partition dispatched to a + /// `tokio-uring` worker, so plan execution itself happens on the + /// pool's io_uring runtimes in parallel. + /// + /// If the root is a [`CoalescePartitionsExec`] or + /// [`SortPreservingMergeExec`] the root is stripped and its + /// children's `N` partitions are fanned out instead; the final + /// merge then runs as a single follow-up task on the pool. + #[cfg(target_os = "linux")] + async fn benchmark_query_uring( + &self, + sql: &str, + query_id: usize, + ctx: &SessionContext, + pool: &Arc, + ) -> Result> { + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + + println!("Q{query_id}: {sql}"); + + let mut millis = Vec::with_capacity(self.iterations()); + let mut query_results = vec![]; + for i in 0..self.iterations() { + let start = Instant::now(); + + // Planning runs on the main Tokio runtime — it's CPU-bound + // and produces a Send, 'static physical plan we can hand off. + let df = ctx.sql(sql).await?; + let full_plan = df.create_physical_plan().await?; + let task_ctx = ctx.task_ctx(); + + // If the root collapses N partitions into 1 (Coalesce / SPM), + // strip it so we can fan the children out across the pool — + // otherwise only one worker would do any real work. + enum FinalMerge { + None, + Concat, + SortPreserving(datafusion::physical_expr::LexOrdering, Option), + } + + let (child_plan, final_merge): (Arc, FinalMerge) = + if let Some(spm) = full_plan.downcast_ref::() { + ( + Arc::clone(spm.input()), + FinalMerge::SortPreserving(spm.expr().clone(), spm.fetch()), + ) + } else if let Some(c) = full_plan.downcast_ref::() + { + (Arc::clone(c.input()), FinalMerge::Concat) + } else { + (Arc::clone(&full_plan), FinalMerge::None) + }; + + // Wrap the child of every `RepartitionExec` with + // `PoolDispatchExec` so each input subtree (partial + // aggregation + scan) runs on its own round-robin pool + // worker. Otherwise all `N` input-fetcher tasks spawn on + // the current-thread runtime of whichever worker first + // polls the repartition, serializing the CPU-heavy partial + // aggregation on one core. + let child_plan = + crate::util::tokio_uring_dispatch::wrap_repartition_inputs_with_pool_dispatch( + child_plan, pool, + )?; + + let n_partitions = child_plan + .properties() + .output_partitioning() + .partition_count(); + + // Each child partition runs on its own pool worker. + let mut handles = Vec::with_capacity(n_partitions); + for p in 0..n_partitions { + let plan = Arc::clone(&child_plan); + let task_ctx = Arc::clone(&task_ctx); + handles.push(pool.spawn(move || async move { + let mut stream = plan.execute(p, task_ctx)?; + let mut batches = Vec::new(); + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + Ok::<_, DataFusionError>(batches) + })); + } + + let mut partitions: Vec> = + Vec::with_capacity(n_partitions); + for h in handles { + partitions.push(h.await??); + } + + // Reassemble according to the root we stripped. + let row_count = match final_merge { + FinalMerge::None | FinalMerge::Concat => partitions + .iter() + .flat_map(|p| p.iter()) + .map(|b| b.num_rows()) + .sum::(), + FinalMerge::SortPreserving(expr, fetch) => { + let schema = child_plan.schema(); + let mem = + MemorySourceConfig::try_new_exec(&partitions, schema, None)?; + let mut merge = SortPreservingMergeExec::new(expr, mem); + if let Some(f) = fetch { + merge = merge.with_fetch(Some(f)); + } + let merge: Arc = Arc::new(merge); + let task_ctx = Arc::clone(&task_ctx); + let batches = pool + .spawn(move || async move { + let mut stream = merge.execute(0, task_ctx)?; + let mut batches = Vec::new(); + while let Some(b) = stream.next().await { + batches.push(b?); + } + Ok::<_, DataFusionError>(batches) + }) + .await??; + batches.iter().map(|b| b.num_rows()).sum::() + } + }; + + let elapsed = start.elapsed(); + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); + println!( + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows ({n_partitions} partitions)" + ); + query_results.push(QueryResult { elapsed, row_count }); + } + + if self.common.debug { + ctx.sql(sql) + .await? + .explain_with_options( + ExplainOption::default().with_format(ExplainFormat::Tree), + )? + .show() + .await?; + } + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {query_id} avg time: {avg:.2} ms (tokio-uring pool)"); + print_memory_stats(); + + Ok(query_results) + } + async fn benchmark_query( &self, sql: &str, diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index 6dc11c0f425bd..dcdb97901a9e2 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -20,6 +20,12 @@ pub mod latency_object_store; mod memory; mod options; mod run; +#[cfg(target_os = "linux")] +pub mod tokio_uring_dispatch; +#[cfg(target_os = "linux")] +pub mod tokio_uring_pool; +#[cfg(target_os = "linux")] +pub mod tokio_uring_store; pub use memory::print_memory_stats; pub use options::CommonOpt; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a50a5268c0bfe..1162881ea417a 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -70,6 +70,19 @@ pub struct CommonOpt { /// Adds random latency in the range 20-200ms to each object store operation. #[arg(long = "simulate-latency")] pub simulate_latency: bool, + + /// Number of dedicated `tokio-uring` worker threads. Each worker + /// owns its own `tokio_uring::start` runtime (a current-thread Tokio + /// reactor with an `io_uring` driver) and executes a partition of + /// the plan, while the custom `TokioUringObjectStore` dispatches + /// reads across the same pool — a thread-per-core model with + /// native `io_uring`. + /// + /// If unset, defaults to `std::thread::available_parallelism()` on + /// Linux and is disabled elsewhere. Pass `0` to force the legacy + /// tokio multi-thread path. + #[arg(long = "tokio-uring-workers")] + pub tokio_uring_workers: Option, } impl CommonOpt { @@ -145,8 +158,41 @@ impl CommonOpt { "Simulating S3-like object store latency (get: 25-200ms, list: 40-400ms)" ); } + Ok(rt) } + + /// Build a [`TokioUringPool`](super::tokio_uring_pool::TokioUringPool) + /// if the benchmark should use the thread-per-core `tokio-uring` path. + /// + /// Defaults: + /// * On Linux, if `--tokio-uring-workers` is unset, spawn one worker + /// per available CPU (thread-per-core). + /// * `--tokio-uring-workers 0` forces the legacy tokio MT path. + /// * Non-Linux platforms always return `Ok(None)`. + #[cfg(target_os = "linux")] + pub fn tokio_uring_pool( + &self, + ) -> Result>> { + let workers = match self.tokio_uring_workers { + Some(0) => return Ok(None), + Some(n) => n, + None => std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1), + }; + let pool = super::tokio_uring_pool::TokioUringPool::new(workers)?; + println!("Spawned tokio-uring pool with {workers} worker runtime(s)"); + Ok(Some(pool)) + } + + #[cfg(not(target_os = "linux"))] + pub fn tokio_uring_pool(&self) -> Result> { + if matches!(self.tokio_uring_workers, Some(n) if n > 0) { + println!("--tokio-uring-workers is Linux-only; ignoring on this platform"); + } + Ok(None) + } } /// Parse capacity limit from string to number of bytes by allowing units: K, M and G. @@ -190,6 +236,7 @@ mod tests { sort_spill_reservation_bytes: None, debug: false, simulate_latency: false, + tokio_uring_workers: None, }; // With env var set, builder should succeed and have a memory pool diff --git a/benchmarks/src/util/tokio_uring_dispatch.rs b/benchmarks/src/util/tokio_uring_dispatch.rs new file mode 100644 index 0000000000000..98a3abc7f8de9 --- /dev/null +++ b/benchmarks/src/util/tokio_uring_dispatch.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PoolDispatchExec`]: wraps an [`ExecutionPlan`] so that each +//! `execute(partition, ...)` call runs on a [`TokioUringPool`] worker, +//! regardless of which worker the caller is on. +//! +//! # Why +//! +//! `RepartitionExec` spawns its `M` input-fetcher tasks with +//! `tokio::spawn` on the *current* runtime the first time any output +//! partition is polled. On a pool of thread-per-core `tokio-uring` +//! reactors that means all `M` fetchers pile onto whichever worker +//! won the race, and the reads and decodes they drive serialize on +//! that one ring. +//! +//! Wrapping the scan-heavy child (typically the `DataSourceExec` at +//! the bottom of the plan) with [`PoolDispatchExec`] fixes this: every +//! input partition is `pool.spawn`-ed across the pool, so `io_uring` +//! submissions and Parquet decode run on different workers in +//! parallel. The wrapper is transparent w.r.t. schema, partitioning, +//! ordering, and statistics — it only redirects where `execute` runs. + +#![cfg(target_os = "linux")] + +use std::fmt::{self, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; +use futures::StreamExt; + +use super::tokio_uring_pool::TokioUringPool; + +/// Transparent wrapper that dispatches `execute` onto a +/// [`TokioUringPool`] worker. +pub struct PoolDispatchExec { + inner: Arc, + pool: Arc, +} + +impl fmt::Debug for PoolDispatchExec { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("PoolDispatchExec") + .field("inner", &self.inner) + .finish() + } +} + +impl PoolDispatchExec { + pub fn new(inner: Arc, pool: Arc) -> Self { + Self { inner, pool } + } +} + +impl DisplayAs for PoolDispatchExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "PoolDispatchExec") + } +} + +impl ExecutionPlan for PoolDispatchExec { + fn name(&self) -> &str { + "PoolDispatchExec" + } + + fn properties(&self) -> &Arc { + self.inner.properties() + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "PoolDispatchExec expects exactly one child, got {}", + children.len() + ))); + } + let child = children.pop().expect("len checked above"); + Ok(Arc::new(PoolDispatchExec::new( + child, + Arc::clone(&self.pool), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let inner = Arc::clone(&self.inner); + let pool = Arc::clone(&self.pool); + let schema = self.inner.schema(); + + // Bridge the inner stream back to the caller via a bounded + // channel. Batches produced on the pool worker are delivered + // to whoever is polling the outer stream (typically a + // RepartitionExec fetcher task on a different worker). + let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); + let tx = builder.tx(); + + builder.spawn(async move { + let job = pool.spawn(move || async move { + let mut stream = match inner.execute(partition, context) { + Ok(s) => s, + Err(e) => { + let _ = tx.send(Err(e)).await; + return Ok::<(), DataFusionError>(()); + } + }; + while let Some(batch) = stream.next().await { + if tx.send(batch).await.is_err() { + // Consumer dropped; abandon. + break; + } + } + Ok(()) + }); + job.await??; + Ok(()) + }); + + Ok(builder.build()) + } +} + +/// Walk `plan` bottom-up and wrap the **child** of every +/// [`RepartitionExec`] with a [`PoolDispatchExec`]. +/// +/// `RepartitionExec` lazily spawns one input-fetcher task per input +/// partition on the *current* runtime the first time any output +/// partition is polled. On a thread-per-core pool that means all `N` +/// fetchers land on whichever worker won the race — and every input +/// subtree (partial aggregation, filter, scan) runs serially on that +/// one core. Wrapping each fetcher's root with `PoolDispatchExec` +/// redirects the actual work to round-robin pool workers, so the `N` +/// input pipelines run in parallel. +/// +/// This also covers leaf I/O: the scan sits *inside* each +/// dispatched subtree, so its decode runs on the same pool worker as +/// its partial aggregation (one cross-thread hop per batch into the +/// repartition channel, instead of two). +pub fn wrap_repartition_inputs_with_pool_dispatch( + plan: Arc, + pool: &Arc, +) -> Result> { + plan.transform_up(|node| { + if node.downcast_ref::().is_none() { + return Ok(Transformed::no(node)); + } + let children = node.children(); + if children.len() != 1 { + return Ok(Transformed::no(node)); + } + let child: Arc = Arc::clone(children[0]); + if child.downcast_ref::().is_some() { + return Ok(Transformed::no(node)); + } + let wrapped: Arc = + Arc::new(PoolDispatchExec::new(child, Arc::clone(pool))); + let new_node = node.with_new_children(vec![wrapped])?; + Ok(Transformed::yes(new_node)) + }) + .map(|t| t.data) +} diff --git a/benchmarks/src/util/tokio_uring_pool.rs b/benchmarks/src/util/tokio_uring_pool.rs new file mode 100644 index 0000000000000..68a03e88237c8 --- /dev/null +++ b/benchmarks/src/util/tokio_uring_pool.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pool of dedicated `tokio-uring` runtimes. +//! +//! Each pool worker owns an OS thread that runs its own +//! [`tokio_uring::start`] runtime (a current-thread Tokio reactor with +//! an `io_uring` driver). The pool exposes a single +//! [`TokioUringPool::spawn`] entry point that dispatches an arbitrary +//! `Send + 'static` future round-robin to a worker. +//! +//! This primitive powers two things in the benchmark: +//! +//! * The [`TokioUringObjectStore`](crate::util::tokio_uring_store::TokioUringObjectStore) +//! uses it to run its `get_ranges` reads. +//! * The ClickBench runner uses it to execute per-partition +//! [`SendableRecordBatchStream`](datafusion::physical_plan::SendableRecordBatchStream) +//! jobs, so plan execution itself happens on the `io_uring` reactors +//! — giving genuine I/O + compute parallelism across `N` threads. + +#![cfg(target_os = "linux")] + +use std::cell::Cell; +use std::future::Future; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use datafusion::error::{DataFusionError, Result}; +use tokio::sync::{mpsc, oneshot}; + +/// A job queued onto a worker: a `Send` closure that, when invoked on +/// the worker thread, spawns the real (possibly `!Send`) future onto +/// the local `tokio-uring` runtime. +type BoxedJob = Box; + +/// Pool of dedicated `tokio-uring` runtimes. +pub struct TokioUringPool { + workers: Vec>, + next: AtomicUsize, +} + +impl TokioUringPool { + /// Spawn `workers` OS threads, each hosting its own + /// [`tokio_uring::start`] runtime. + pub fn new(workers: usize) -> Result> { + assert!(workers > 0, "TokioUringPool requires at least one worker"); + + let mut senders = Vec::with_capacity(workers); + for i in 0..workers { + let (tx, rx) = mpsc::unbounded_channel::(); + std::thread::Builder::new() + .name(format!("tokio-uring-{i}")) + .spawn(move || { + tokio_uring::start(run_worker(rx)); + }) + .map_err(|e| { + DataFusionError::External( + format!("failed to spawn tokio-uring worker: {e}").into(), + ) + })?; + senders.push(tx); + } + + Ok(Arc::new(Self { + workers: senders, + next: AtomicUsize::new(0), + })) + } + + /// Number of worker runtimes in the pool. + pub fn worker_count(&self) -> usize { + self.workers.len() + } + + /// Run a future on the next worker (round-robin). + /// + /// The caller passes a `Send + 'static` **closure** that builds the + /// future. The closure is shipped to the worker, invoked there, and + /// the resulting future is spawned onto that worker's `tokio-uring` + /// runtime. The future itself may be `!Send` (e.g. hold a + /// [`tokio_uring::fs::File`]) — it never crosses threads because it + /// is constructed on the worker. + /// + /// The returned future resolves to the future's output (wrapped in + /// `Result` to surface dead-worker / cancellation errors). + pub fn spawn( + self: &Arc, + make_future: R, + ) -> impl Future> + Send + 'static + where + R: FnOnce() -> F + Send + 'static, + F: Future + 'static, + O: Send + 'static, + { + let (tx, rx) = oneshot::channel::(); + let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len(); + + let send_result = self.workers[idx].send(Box::new(move || { + tokio_uring::spawn(async move { + let out = make_future().await; + let _ = tx.send(out); + }); + })); + + async move { + send_result.map_err(|_| { + DataFusionError::External("tokio-uring worker thread is gone".into()) + })?; + rx.await.map_err(|_| { + DataFusionError::External( + "tokio-uring job was cancelled before completing".into(), + ) + }) + } + } +} + +thread_local! { + /// `true` while the current thread is running a pool worker's + /// [`tokio_uring::start`] reactor. Consumers (e.g. the object + /// store) use this to stay **local** — spawning on the current + /// ring instead of hopping to a round-robin pool worker. + static IN_WORKER: Cell = const { Cell::new(false) }; +} + +/// `true` if the current thread is inside a pool worker's reactor. +pub fn in_worker() -> bool { + IN_WORKER.with(|flag| flag.get()) +} + +async fn run_worker(mut rx: mpsc::UnboundedReceiver) { + IN_WORKER.with(|flag| flag.set(true)); + while let Some(job) = rx.recv().await { + job(); + } +} diff --git a/benchmarks/src/util/tokio_uring_store.rs b/benchmarks/src/util/tokio_uring_store.rs new file mode 100644 index 0000000000000..e892f63c670a9 --- /dev/null +++ b/benchmarks/src/util/tokio_uring_store.rs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Local [`ObjectStore`] that performs reads via a shared +//! [`TokioUringPool`]. Writes, list, copy, and delete delegate to +//! [`LocalFileSystem`]. + +#![cfg(target_os = "linux")] + +use std::fmt; +use std::ops::Range; +use std::path::{Path as FsPath, PathBuf}; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, +}; + +use tokio::sync::oneshot; + +use super::tokio_uring_pool::{TokioUringPool, in_worker}; + +pub struct TokioUringObjectStore { + inner: Arc, + pool: Arc, +} + +impl TokioUringObjectStore { + pub fn new(pool: Arc) -> Self { + Self { + inner: Arc::new(LocalFileSystem::new()), + pool, + } + } + + async fn read_ranges_uring( + &self, + path: PathBuf, + ranges: Vec>, + ) -> Result> { + // Fast path: the caller is already on one of the pool's + // `tokio-uring` reactors (e.g. plan execution dispatched by + // ClickBench). Spawn the read on the *current* ring so the + // bytes land on the same core that will consume them — no + // mpsc/oneshot hop to another worker. + if in_worker() { + let (tx, rx) = oneshot::channel::>>(); + tokio_uring::spawn(async move { + let _ = tx.send(read_ranges_inner(&path, &ranges).await); + }); + return rx.await.map_err(|_| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: "tokio-uring local task was cancelled".into(), + })?; + } + + // Slow path: the caller is on a non-uring runtime (e.g. the + // main tokio MT runtime used during planning). Ship the work + // to a pool worker via round-robin. + let job = self + .pool + .spawn(move || async move { read_ranges_inner(&path, &ranges).await }); + job.await.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })? + } +} + +impl fmt::Debug for TokioUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TokioUringObjectStore") + .field("workers", &self.pool.worker_count()) + .finish() + } +} + +impl fmt::Display for TokioUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "TokioUringObjectStore(workers={})", + self.pool.worker_count() + ) + } +} + +/// Open the file on the worker's runtime, then read every range +/// concurrently via `tokio_uring::spawn` on the same ring. +async fn read_ranges_inner(path: &FsPath, ranges: &[Range]) -> Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + + let file = tokio_uring::fs::File::open(path.to_path_buf()) + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + object_store::Error::NotFound { + path: path.display().to_string(), + source: e.into(), + } + } else { + object_store::Error::Generic { + store: "TokioUringObjectStore", + source: e.into(), + } + } + })?; + let file = Arc::new(file); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let file = Arc::clone(&file); + let range = r.clone(); + handles.push(tokio_uring::spawn( + async move { read_one(file, range).await }, + )); + } + + let mut out = Vec::with_capacity(ranges.len()); + for h in handles { + out.push(h.await.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })??); + } + + if let Ok(file) = Arc::try_unwrap(file) { + let _ = file.close().await; + } + + Ok(out) +} + +async fn read_one(file: Arc, range: Range) -> Result { + let total = range.end.saturating_sub(range.start) as usize; + let mut out: Vec = Vec::with_capacity(total); + + while out.len() < total { + let remaining = total - out.len(); + let offset = range.start + out.len() as u64; + // tokio-uring requires owned buffers: we hand the Vec to the + // kernel and get it back alongside the result. + let buf = vec![0u8; remaining]; + let (res, buf) = file.read_at(buf, offset).await; + let n = res.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: e.into(), + })?; + if n == 0 { + return Err(object_store::Error::Generic { + store: "TokioUringObjectStore", + source: format!("unexpected EOF reading {}..{}", range.start, range.end) + .into(), + }); + } + out.extend_from_slice(&buf[..n]); + } + + Ok(Bytes::from(out)) +} + +#[async_trait] +impl ObjectStore for TokioUringObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.head { + return self.inner.get_opts(location, options).await; + } + + // Delegate a HEAD through the inner store so conditional options + // (if_match, if_modified_since, …) are honored. + let head_opts = GetOptions { + head: true, + range: None, + ..options.clone() + }; + let meta = self.inner.get_opts(location, head_opts).await?.meta; + let file_size = meta.size; + + let range = match &options.range { + Some(r) => { + r.as_range(file_size) + .map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })? + } + None => 0..file_size, + }; + + if range.start == range.end { + let stream = futures::stream::once(async { Ok(Bytes::new()) }).boxed(); + return Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes: Attributes::new(), + }); + } + + let fs_path = self.inner.path_to_filesystem(location)?; + let mut results = self.read_ranges_uring(fs_path, vec![range.clone()]).await?; + let bytes = results.remove(0); + let stream = futures::stream::once(async { Ok(bytes) }).boxed(); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes: Attributes::new(), + }) + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + for r in ranges { + if r.start > r.end { + return Err(object_store::Error::Generic { + store: "TokioUringObjectStore", + source: format!("invalid range: start {} > end {}", r.start, r.end) + .into(), + }); + } + } + let fs_path = self.inner.path_to_filesystem(location)?; + self.read_ranges_uring(fs_path, ranges.to_vec()).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +}