diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..9a541f6383162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -443,7 +443,7 @@ version = "58.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c30a1365d7a7dc50cc847e54154e6af49e4c4b0fddc9f607b687f29212082743" dependencies = [ - "bitflags", + "bitflags 2.11.0", "serde", "serde_core", "serde_json", @@ -1028,6 +1028,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.0" @@ -1083,7 +1089,7 @@ checksum = "ee04c4c84f1f811b017f2fbb7dd8815c976e7ca98593de9c1e2afad0f636bff4" dependencies = [ "async-stream", "base64 0.22.1", - "bitflags", + "bitflags 2.11.0", "bollard-buildkit-proto", "bollard-stubs", "bytes", @@ -1824,6 +1830,7 @@ dependencies = [ "serde_json", "snmalloc-rs", "tokio", + "tokio-uring", "tokio-util", ] @@ -3019,7 +3026,7 @@ version = "25.12.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" dependencies = [ - "bitflags", + "bitflags 2.11.0", "rustc_version", ] @@ -3538,7 +3545,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -3766,6 +3773,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -3992,7 +4009,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ - "bitflags", + "bitflags 2.11.0", "libc", "plain", "redox_syscall 0.7.3", @@ -4147,7 +4164,7 @@ version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "cfg_aliases", "libc", @@ -4266,7 +4283,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -4887,7 +4904,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.3", "thiserror", "tokio", "tracing", @@ -4924,7 +4941,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] @@ -5092,7 +5109,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -5101,7 +5118,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -5312,7 +5329,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.11.0", "errno", "libc", "linux-raw-sys", @@ -5381,7 +5398,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a990b25f351b25139ddc7f21ee3f6f56f86d6846b74ac8fad3a719a287cd4a0" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "clipboard-win", "home", @@ -5480,7 +5497,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags", + "bitflags 2.11.0", "core-foundation", "core-foundation-sys", "libc", @@ -5765,6 +5782,16 @@ dependencies = [ "cmake", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.6.3" @@ -6189,7 +6216,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -6225,7 +6252,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.10.1", - "socket2", + "socket2 0.6.3", "tokio", "tokio-util", "whoami", @@ -6253,6 +6280,20 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-uring" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" +dependencies = [ + "futures-util", + "io-uring", + "libc", + "slab", + "socket2 0.4.10", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -6315,7 +6356,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2", + "socket2 0.6.3", "sync_wrapper", "tokio", "tokio-stream", @@ -6361,7 +6402,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags", + "bitflags 2.11.0", "bytes", "futures-util", "http 1.4.0", @@ -6892,7 +6933,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hashbrown 0.15.5", "indexmap 2.14.0", "semver", @@ -7304,7 +7345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.0", "indexmap 2.14.0", "log", "serde", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index f82f1c0a03e3d..1c67146091586 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -60,5 +60,8 @@ snmalloc-rs = { version = "0.7", optional = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tokio-util = { version = "0.7.17" } +[target.'cfg(target_os = "linux")'.dependencies] +tokio-uring = "0.5" + [dev-dependencies] datafusion-proto = { workspace = true } diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 70aaeb7d2d192..3252a78ebeea1 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -18,9 +18,13 @@ use std::fs; use std::io::ErrorKind; use std::path::{Path, PathBuf}; +#[cfg(target_os = "linux")] +use std::sync::Arc; use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats}; use clap::Args; +#[cfg(target_os = "linux")] +use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::logical_expr::{ExplainFormat, ExplainOption}; use datafusion::{ error::{DataFusionError, Result}, @@ -28,6 +32,8 @@ use datafusion::{ }; use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; +#[cfg(target_os = "linux")] +use futures::StreamExt; /// SQL to create the hits view with proper EventDate casting. /// @@ -208,6 +214,25 @@ impl RunOpt { } let rt = self.common.build_runtime()?; + + // If `--tokio-uring-workers` was set, spawn the pool and + // register a TokioUringObjectStore for file:// so every read + // travels through one of the pool's io_uring runtimes. + #[cfg(target_os = "linux")] + let uring_pool = { + let pool = self.common.tokio_uring_pool()?; + if let Some(pool) = pool.as_ref() { + use crate::util::tokio_uring_store::TokioUringObjectStore; + let store: Arc = + Arc::new(TokioUringObjectStore::new(Arc::clone(pool))); + let url = ObjectStoreUrl::parse("file:///")?; + rt.register_object_store(url.as_ref(), store); + } + pool + }; + #[cfg(not(target_os = "linux"))] + let uring_pool: Option<()> = self.common.tokio_uring_pool()?; + let ctx = SessionContext::new_with_config_rt(config, rt); self.register_hits(&ctx).await?; @@ -225,7 +250,17 @@ impl RunOpt { break; }; benchmark_run.start_new_case(&format!("Query {query_id}")); - let query_run = self.benchmark_query(&sql, query_id, &ctx).await; + #[cfg(target_os = "linux")] + let query_run = if let Some(pool) = uring_pool.as_ref() { + self.benchmark_query_uring(&sql, query_id, &ctx, pool).await + } else { + self.benchmark_query(&sql, query_id, &ctx).await + }; + #[cfg(not(target_os = "linux"))] + let query_run = { + let _ = &uring_pool; + self.benchmark_query(&sql, query_id, &ctx).await + }; match query_run { Ok(query_results) => { for iter in query_results { @@ -243,6 +278,144 @@ impl RunOpt { Ok(()) } + /// Execute the query with each fan-out partition dispatched to a + /// `tokio-uring` worker, so plan execution itself happens on the + /// pool's io_uring runtimes in parallel. + /// + /// If the root is a [`CoalescePartitionsExec`] or + /// [`SortPreservingMergeExec`] the root is stripped and its + /// children's `N` partitions are fanned out instead; the final + /// merge then runs as a single follow-up task on the pool. + #[cfg(target_os = "linux")] + async fn benchmark_query_uring( + &self, + sql: &str, + query_id: usize, + ctx: &SessionContext, + pool: &Arc, + ) -> Result> { + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + + println!("Q{query_id}: {sql}"); + + let mut millis = Vec::with_capacity(self.iterations()); + let mut query_results = vec![]; + for i in 0..self.iterations() { + let start = Instant::now(); + + // Planning runs on the main Tokio runtime — it's CPU-bound + // and produces a Send, 'static physical plan we can hand off. + let df = ctx.sql(sql).await?; + let full_plan = df.create_physical_plan().await?; + let task_ctx = ctx.task_ctx(); + + // If the root collapses N partitions into 1 (Coalesce / SPM), + // strip it so we can fan the children out across the pool — + // otherwise only one worker would do any real work. + enum FinalMerge { + None, + Concat, + SortPreserving(datafusion::physical_expr::LexOrdering, Option), + } + + let (child_plan, final_merge): (Arc, FinalMerge) = + if let Some(spm) = full_plan.downcast_ref::() { + ( + Arc::clone(spm.input()), + FinalMerge::SortPreserving(spm.expr().clone(), spm.fetch()), + ) + } else if let Some(c) = full_plan.downcast_ref::() + { + (Arc::clone(c.input()), FinalMerge::Concat) + } else { + (Arc::clone(&full_plan), FinalMerge::None) + }; + + let n_partitions = child_plan + .properties() + .output_partitioning() + .partition_count(); + + // Each child partition runs on its own pool worker. + let mut handles = Vec::with_capacity(n_partitions); + for p in 0..n_partitions { + let plan = Arc::clone(&child_plan); + let task_ctx = Arc::clone(&task_ctx); + handles.push(pool.spawn(move || async move { + let mut stream = plan.execute(p, task_ctx)?; + let mut batches = Vec::new(); + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + Ok::<_, DataFusionError>(batches) + })); + } + + let mut partitions: Vec> = + Vec::with_capacity(n_partitions); + for h in handles { + partitions.push(h.await??); + } + + // Reassemble according to the root we stripped. + let row_count = match final_merge { + FinalMerge::None | FinalMerge::Concat => partitions + .iter() + .flat_map(|p| p.iter()) + .map(|b| b.num_rows()) + .sum::(), + FinalMerge::SortPreserving(expr, fetch) => { + let schema = child_plan.schema(); + let mem = + MemorySourceConfig::try_new_exec(&partitions, schema, None)?; + let mut merge = SortPreservingMergeExec::new(expr, mem); + if let Some(f) = fetch { + merge = merge.with_fetch(Some(f)); + } + let merge: Arc = Arc::new(merge); + let task_ctx = Arc::clone(&task_ctx); + let batches = pool + .spawn(move || async move { + let mut stream = merge.execute(0, task_ctx)?; + let mut batches = Vec::new(); + while let Some(b) = stream.next().await { + batches.push(b?); + } + Ok::<_, DataFusionError>(batches) + }) + .await??; + batches.iter().map(|b| b.num_rows()).sum::() + } + }; + + let elapsed = start.elapsed(); + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); + println!( + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows ({n_partitions} partitions)" + ); + query_results.push(QueryResult { elapsed, row_count }); + } + + if self.common.debug { + ctx.sql(sql) + .await? + .explain_with_options( + ExplainOption::default().with_format(ExplainFormat::Tree), + )? + .show() + .await?; + } + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {query_id} avg time: {avg:.2} ms (tokio-uring pool)"); + print_memory_stats(); + + Ok(query_results) + } + async fn benchmark_query( &self, sql: &str, diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index 6dc11c0f425bd..9d8eaae7f8e8e 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -20,6 +20,10 @@ pub mod latency_object_store; mod memory; mod options; mod run; +#[cfg(target_os = "linux")] +pub mod tokio_uring_pool; +#[cfg(target_os = "linux")] +pub mod tokio_uring_store; pub use memory::print_memory_stats; pub use options::CommonOpt; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a50a5268c0bfe..1162881ea417a 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -70,6 +70,19 @@ pub struct CommonOpt { /// Adds random latency in the range 20-200ms to each object store operation. #[arg(long = "simulate-latency")] pub simulate_latency: bool, + + /// Number of dedicated `tokio-uring` worker threads. Each worker + /// owns its own `tokio_uring::start` runtime (a current-thread Tokio + /// reactor with an `io_uring` driver) and executes a partition of + /// the plan, while the custom `TokioUringObjectStore` dispatches + /// reads across the same pool — a thread-per-core model with + /// native `io_uring`. + /// + /// If unset, defaults to `std::thread::available_parallelism()` on + /// Linux and is disabled elsewhere. Pass `0` to force the legacy + /// tokio multi-thread path. + #[arg(long = "tokio-uring-workers")] + pub tokio_uring_workers: Option, } impl CommonOpt { @@ -145,8 +158,41 @@ impl CommonOpt { "Simulating S3-like object store latency (get: 25-200ms, list: 40-400ms)" ); } + Ok(rt) } + + /// Build a [`TokioUringPool`](super::tokio_uring_pool::TokioUringPool) + /// if the benchmark should use the thread-per-core `tokio-uring` path. + /// + /// Defaults: + /// * On Linux, if `--tokio-uring-workers` is unset, spawn one worker + /// per available CPU (thread-per-core). + /// * `--tokio-uring-workers 0` forces the legacy tokio MT path. + /// * Non-Linux platforms always return `Ok(None)`. + #[cfg(target_os = "linux")] + pub fn tokio_uring_pool( + &self, + ) -> Result>> { + let workers = match self.tokio_uring_workers { + Some(0) => return Ok(None), + Some(n) => n, + None => std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1), + }; + let pool = super::tokio_uring_pool::TokioUringPool::new(workers)?; + println!("Spawned tokio-uring pool with {workers} worker runtime(s)"); + Ok(Some(pool)) + } + + #[cfg(not(target_os = "linux"))] + pub fn tokio_uring_pool(&self) -> Result> { + if matches!(self.tokio_uring_workers, Some(n) if n > 0) { + println!("--tokio-uring-workers is Linux-only; ignoring on this platform"); + } + Ok(None) + } } /// Parse capacity limit from string to number of bytes by allowing units: K, M and G. @@ -190,6 +236,7 @@ mod tests { sort_spill_reservation_bytes: None, debug: false, simulate_latency: false, + tokio_uring_workers: None, }; // With env var set, builder should succeed and have a memory pool diff --git a/benchmarks/src/util/tokio_uring_pool.rs b/benchmarks/src/util/tokio_uring_pool.rs new file mode 100644 index 0000000000000..68a03e88237c8 --- /dev/null +++ b/benchmarks/src/util/tokio_uring_pool.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pool of dedicated `tokio-uring` runtimes. +//! +//! Each pool worker owns an OS thread that runs its own +//! [`tokio_uring::start`] runtime (a current-thread Tokio reactor with +//! an `io_uring` driver). The pool exposes a single +//! [`TokioUringPool::spawn`] entry point that dispatches an arbitrary +//! `Send + 'static` future round-robin to a worker. +//! +//! This primitive powers two things in the benchmark: +//! +//! * The [`TokioUringObjectStore`](crate::util::tokio_uring_store::TokioUringObjectStore) +//! uses it to run its `get_ranges` reads. +//! * The ClickBench runner uses it to execute per-partition +//! [`SendableRecordBatchStream`](datafusion::physical_plan::SendableRecordBatchStream) +//! jobs, so plan execution itself happens on the `io_uring` reactors +//! — giving genuine I/O + compute parallelism across `N` threads. + +#![cfg(target_os = "linux")] + +use std::cell::Cell; +use std::future::Future; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use datafusion::error::{DataFusionError, Result}; +use tokio::sync::{mpsc, oneshot}; + +/// A job queued onto a worker: a `Send` closure that, when invoked on +/// the worker thread, spawns the real (possibly `!Send`) future onto +/// the local `tokio-uring` runtime. +type BoxedJob = Box; + +/// Pool of dedicated `tokio-uring` runtimes. +pub struct TokioUringPool { + workers: Vec>, + next: AtomicUsize, +} + +impl TokioUringPool { + /// Spawn `workers` OS threads, each hosting its own + /// [`tokio_uring::start`] runtime. + pub fn new(workers: usize) -> Result> { + assert!(workers > 0, "TokioUringPool requires at least one worker"); + + let mut senders = Vec::with_capacity(workers); + for i in 0..workers { + let (tx, rx) = mpsc::unbounded_channel::(); + std::thread::Builder::new() + .name(format!("tokio-uring-{i}")) + .spawn(move || { + tokio_uring::start(run_worker(rx)); + }) + .map_err(|e| { + DataFusionError::External( + format!("failed to spawn tokio-uring worker: {e}").into(), + ) + })?; + senders.push(tx); + } + + Ok(Arc::new(Self { + workers: senders, + next: AtomicUsize::new(0), + })) + } + + /// Number of worker runtimes in the pool. + pub fn worker_count(&self) -> usize { + self.workers.len() + } + + /// Run a future on the next worker (round-robin). + /// + /// The caller passes a `Send + 'static` **closure** that builds the + /// future. The closure is shipped to the worker, invoked there, and + /// the resulting future is spawned onto that worker's `tokio-uring` + /// runtime. The future itself may be `!Send` (e.g. hold a + /// [`tokio_uring::fs::File`]) — it never crosses threads because it + /// is constructed on the worker. + /// + /// The returned future resolves to the future's output (wrapped in + /// `Result` to surface dead-worker / cancellation errors). + pub fn spawn( + self: &Arc, + make_future: R, + ) -> impl Future> + Send + 'static + where + R: FnOnce() -> F + Send + 'static, + F: Future + 'static, + O: Send + 'static, + { + let (tx, rx) = oneshot::channel::(); + let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len(); + + let send_result = self.workers[idx].send(Box::new(move || { + tokio_uring::spawn(async move { + let out = make_future().await; + let _ = tx.send(out); + }); + })); + + async move { + send_result.map_err(|_| { + DataFusionError::External("tokio-uring worker thread is gone".into()) + })?; + rx.await.map_err(|_| { + DataFusionError::External( + "tokio-uring job was cancelled before completing".into(), + ) + }) + } + } +} + +thread_local! { + /// `true` while the current thread is running a pool worker's + /// [`tokio_uring::start`] reactor. Consumers (e.g. the object + /// store) use this to stay **local** — spawning on the current + /// ring instead of hopping to a round-robin pool worker. + static IN_WORKER: Cell = const { Cell::new(false) }; +} + +/// `true` if the current thread is inside a pool worker's reactor. +pub fn in_worker() -> bool { + IN_WORKER.with(|flag| flag.get()) +} + +async fn run_worker(mut rx: mpsc::UnboundedReceiver) { + IN_WORKER.with(|flag| flag.set(true)); + while let Some(job) = rx.recv().await { + job(); + } +} diff --git a/benchmarks/src/util/tokio_uring_store.rs b/benchmarks/src/util/tokio_uring_store.rs new file mode 100644 index 0000000000000..e892f63c670a9 --- /dev/null +++ b/benchmarks/src/util/tokio_uring_store.rs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Local [`ObjectStore`] that performs reads via a shared +//! [`TokioUringPool`]. Writes, list, copy, and delete delegate to +//! [`LocalFileSystem`]. + +#![cfg(target_os = "linux")] + +use std::fmt; +use std::ops::Range; +use std::path::{Path as FsPath, PathBuf}; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, +}; + +use tokio::sync::oneshot; + +use super::tokio_uring_pool::{TokioUringPool, in_worker}; + +pub struct TokioUringObjectStore { + inner: Arc, + pool: Arc, +} + +impl TokioUringObjectStore { + pub fn new(pool: Arc) -> Self { + Self { + inner: Arc::new(LocalFileSystem::new()), + pool, + } + } + + async fn read_ranges_uring( + &self, + path: PathBuf, + ranges: Vec>, + ) -> Result> { + // Fast path: the caller is already on one of the pool's + // `tokio-uring` reactors (e.g. plan execution dispatched by + // ClickBench). Spawn the read on the *current* ring so the + // bytes land on the same core that will consume them — no + // mpsc/oneshot hop to another worker. + if in_worker() { + let (tx, rx) = oneshot::channel::>>(); + tokio_uring::spawn(async move { + let _ = tx.send(read_ranges_inner(&path, &ranges).await); + }); + return rx.await.map_err(|_| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: "tokio-uring local task was cancelled".into(), + })?; + } + + // Slow path: the caller is on a non-uring runtime (e.g. the + // main tokio MT runtime used during planning). Ship the work + // to a pool worker via round-robin. + let job = self + .pool + .spawn(move || async move { read_ranges_inner(&path, &ranges).await }); + job.await.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })? + } +} + +impl fmt::Debug for TokioUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TokioUringObjectStore") + .field("workers", &self.pool.worker_count()) + .finish() + } +} + +impl fmt::Display for TokioUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "TokioUringObjectStore(workers={})", + self.pool.worker_count() + ) + } +} + +/// Open the file on the worker's runtime, then read every range +/// concurrently via `tokio_uring::spawn` on the same ring. +async fn read_ranges_inner(path: &FsPath, ranges: &[Range]) -> Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + + let file = tokio_uring::fs::File::open(path.to_path_buf()) + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + object_store::Error::NotFound { + path: path.display().to_string(), + source: e.into(), + } + } else { + object_store::Error::Generic { + store: "TokioUringObjectStore", + source: e.into(), + } + } + })?; + let file = Arc::new(file); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let file = Arc::clone(&file); + let range = r.clone(); + handles.push(tokio_uring::spawn( + async move { read_one(file, range).await }, + )); + } + + let mut out = Vec::with_capacity(ranges.len()); + for h in handles { + out.push(h.await.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })??); + } + + if let Ok(file) = Arc::try_unwrap(file) { + let _ = file.close().await; + } + + Ok(out) +} + +async fn read_one(file: Arc, range: Range) -> Result { + let total = range.end.saturating_sub(range.start) as usize; + let mut out: Vec = Vec::with_capacity(total); + + while out.len() < total { + let remaining = total - out.len(); + let offset = range.start + out.len() as u64; + // tokio-uring requires owned buffers: we hand the Vec to the + // kernel and get it back alongside the result. + let buf = vec![0u8; remaining]; + let (res, buf) = file.read_at(buf, offset).await; + let n = res.map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: e.into(), + })?; + if n == 0 { + return Err(object_store::Error::Generic { + store: "TokioUringObjectStore", + source: format!("unexpected EOF reading {}..{}", range.start, range.end) + .into(), + }); + } + out.extend_from_slice(&buf[..n]); + } + + Ok(Bytes::from(out)) +} + +#[async_trait] +impl ObjectStore for TokioUringObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.head { + return self.inner.get_opts(location, options).await; + } + + // Delegate a HEAD through the inner store so conditional options + // (if_match, if_modified_since, …) are honored. + let head_opts = GetOptions { + head: true, + range: None, + ..options.clone() + }; + let meta = self.inner.get_opts(location, head_opts).await?.meta; + let file_size = meta.size; + + let range = match &options.range { + Some(r) => { + r.as_range(file_size) + .map_err(|e| object_store::Error::Generic { + store: "TokioUringObjectStore", + source: Box::new(e), + })? + } + None => 0..file_size, + }; + + if range.start == range.end { + let stream = futures::stream::once(async { Ok(Bytes::new()) }).boxed(); + return Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes: Attributes::new(), + }); + } + + let fs_path = self.inner.path_to_filesystem(location)?; + let mut results = self.read_ranges_uring(fs_path, vec![range.clone()]).await?; + let bytes = results.remove(0); + let stream = futures::stream::once(async { Ok(bytes) }).boxed(); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes: Attributes::new(), + }) + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + for r in ranges { + if r.start > r.end { + return Err(object_store::Error::Generic { + store: "TokioUringObjectStore", + source: format!("invalid range: start {} > end {}", r.start, r.end) + .into(), + }); + } + } + let fs_path = self.inner.path_to_filesystem(location)?; + self.read_ranges_uring(fs_path, ranges.to_vec()).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +}