diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..9e0940b5f068b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2148,6 +2148,7 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-object-store-iouring", "datafusion-physical-expr-common", "futures", "insta", @@ -2367,6 +2368,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "datafusion-object-store-iouring" +version = "53.0.0" +dependencies = [ + "async-trait", + "bytes", + "futures", + "io-uring", + "log", + "object_store", + "tempfile", + "tokio", +] + [[package]] name = "datafusion-optimizer" version = "53.0.0" @@ -3766,6 +3781,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" diff --git a/Cargo.toml b/Cargo.toml index 7e75bb59b68f2..daf3269e23008 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ members = [ "benchmarks", "datafusion/macros", "datafusion/doc", + "datafusion/object-store-iouring", ] exclude = ["dev/depcheck"] resolver = "2" @@ -140,6 +141,7 @@ datafusion-functions-table = { path = "datafusion/functions-table", version = "5 datafusion-functions-window = { path = "datafusion/functions-window", version = "53.0.0" } datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "53.0.0" } datafusion-macros = { path = "datafusion/macros", version = "53.0.0" } +datafusion-object-store-iouring = { path = "datafusion/object-store-iouring", version = "53.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "53.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "53.0.0", default-features = false } datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "53.0.0", default-features = false } @@ -153,7 +155,6 @@ datafusion-session = { path = "datafusion/session", version = "53.0.0" } datafusion-spark = { path = "datafusion/spark", version = "53.0.0" } datafusion-sql = { path = "datafusion/sql", version = "53.0.0" } datafusion-substrait = { path = "datafusion/substrait", version = "53.0.0" } - doc-comment = "0.3" env_logger = "0.11" flate2 = "1.1.9" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a2a07d4598b0a..be3f06f15b49d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,8 +67,10 @@ default = [ "parquet", "recursive_protection", "sql", + "io-uring", ] encoding_expressions = ["datafusion-functions/encoding_expressions"] +io-uring = ["datafusion-execution/io-uring"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 06c84d8acb493..e40addb2c1dfb 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -41,7 +41,7 @@ workspace = true name = "datafusion_execution" [features] -default = ["sql"] +default = ["sql", "io-uring"] parquet_encryption = [ "parquet/encryption", @@ -50,6 +50,7 @@ arrow_buffer_pool = [ "arrow-buffer/pool", ] sql = [] +io-uring = ["datafusion-object-store-iouring"] [dependencies] arrow = { workspace = true } @@ -58,6 +59,7 @@ async-trait = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = false } datafusion-expr = { workspace = true, default-features = false } +datafusion-object-store-iouring = { workspace = true, optional = true } datafusion-physical-expr-common = { workspace = true, default-features = false } futures = { workspace = true } log = { workspace = true } diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index 22ce1f0cf2bbf..f607400ae9ce5 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -24,7 +24,7 @@ use datafusion_common::{ DataFusionError, Result, exec_err, internal_datafusion_err, not_impl_err, }; use object_store::ObjectStore; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(all(not(target_arch = "wasm32"), not(feature = "io-uring")))] use object_store::local::LocalFileSystem; use std::sync::Arc; use url::Url; @@ -205,11 +205,30 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths + /// Registers a local filesystem object store to handle `file://` paths. + /// + /// When the `io-uring` feature is enabled, registers an + /// `IoUringObjectStore`, which uses io_uring for batched local + /// file reads. + /// + /// Panics if the `io-uring` feature is enabled but the io_uring worker + /// thread cannot be created — this typically indicates a misconfigured + /// sandbox / seccomp profile and is a deployment bug rather than + /// something to silently paper over. #[cfg(not(target_arch = "wasm32"))] pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); - object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + + #[cfg(feature = "io-uring")] + let local: Arc = Arc::new( + datafusion_object_store_iouring::IoUringObjectStore::new() + .expect("failed to initialize IoUringObjectStore"), + ); + + #[cfg(not(feature = "io-uring"))] + let local: Arc = Arc::new(LocalFileSystem::new()); + + object_stores.insert("file://".to_string(), local); Self { object_stores } } @@ -223,7 +242,7 @@ impl DefaultObjectStoreRegistry { /// /// Stores are registered based on the scheme, host and port of the provided URL -/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the +/// with a local filesystem store automatically registered for `file://` (if the /// target arch is not `wasm32`). /// /// For example: diff --git a/datafusion/object-store-iouring/Cargo.toml b/datafusion/object-store-iouring/Cargo.toml new file mode 100644 index 0000000000000..099cbdebcdb5d --- /dev/null +++ b/datafusion/object-store-iouring/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-object-store-iouring" +description = "io-uring based ObjectStore for DataFusion local file I/O" +keywords = ["arrow", "query", "sql", "io-uring"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_object_store_iouring" + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +object_store = { workspace = true, features = ["fs"] } +tokio = { workspace = true, features = ["sync"] } + +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = "0.7" + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs new file mode 100644 index 0000000000000..431afe98b6757 --- /dev/null +++ b/datafusion/object-store-iouring/src/lib.rs @@ -0,0 +1,549 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! io-uring based [`ObjectStore`] implementation for DataFusion. +//! +//! Provides [`IoUringObjectStore`] which uses Linux's io_uring interface +//! for high-performance local file reads. A dedicated thread runs an +//! io_uring event loop, and read requests are dispatched via channels +//! from async [`ObjectStore`] methods. +//! +//! On non-Linux platforms, [`IoUringObjectStore`] delegates all operations +//! to [`LocalFileSystem`] without io_uring acceleration. +//! +//! # Performance +//! +//! The main benefit is **batched syscalls**: multiple byte-range reads +//! (e.g., Parquet column chunks) are submitted as a single +//! `io_uring_enter()` call instead of individual `pread()` calls. + +#[cfg(target_os = "linux")] +mod uring; + +use std::fmt; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +#[cfg(target_os = "linux")] +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use object_store::{Attributes, GetResultPayload}; +use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +/// ObjectStore implementation that uses io_uring for local file reads on Linux. +/// +/// Write, list, copy, and delete operations are delegated to [`LocalFileSystem`]. +/// Read operations (`get_opts`, `get_ranges`) use a dedicated io_uring thread +/// for batched, zero-copy I/O. +/// +/// # Example +/// +/// ```no_run +/// use datafusion_object_store_iouring::IoUringObjectStore; +/// use object_store::ObjectStore; +/// +/// let store = IoUringObjectStore::new().unwrap(); +/// ``` +pub struct IoUringObjectStore { + inner: Arc, + root: PathBuf, + #[cfg(target_os = "linux")] + uring_sender: tokio::sync::mpsc::UnboundedSender, +} + +impl IoUringObjectStore { + /// Create a new `IoUringObjectStore` with root at `/`. + /// + /// Returns an error if the io_uring worker thread cannot be spawned on + /// Linux. + #[expect( + clippy::result_large_err, + reason = "matches object_store::Result signature" + )] + pub fn new() -> Result { + Self::new_with_root(PathBuf::from("/")) + } + + /// Create a new `IoUringObjectStore` with the given root directory. + /// + /// Returns an error if `root` is not a usable `LocalFileSystem` prefix, + /// or if the io_uring worker thread cannot be spawned. + #[expect( + clippy::result_large_err, + reason = "matches object_store::Result signature" + )] + pub fn new_with_root(root: PathBuf) -> Result { + let inner = if root == std::path::Path::new("/") { + Arc::new(LocalFileSystem::new()) + } else { + Arc::new(LocalFileSystem::new_with_prefix(&root)?) + }; + + #[cfg(target_os = "linux")] + { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + std::thread::Builder::new() + .name("io-uring-worker".to_string()) + .spawn(move || uring::run_uring_loop(rx)) + .map_err(|e| object_store::Error::Generic { + store: "IoUringObjectStore", + source: Box::new(e), + })?; + + Ok(Self { + inner, + root, + uring_sender: tx, + }) + } + + #[cfg(not(target_os = "linux"))] + { + Ok(Self { inner, root }) + } + } + + /// Resolve an [`object_store::path::Path`] to an absolute filesystem path + /// using the same rules as the inner [`LocalFileSystem`] (prefix joining, + /// percent decoding of segments, rejection of `..` and control chars). + #[cfg(target_os = "linux")] + #[expect( + clippy::result_large_err, + reason = "matches object_store::Result signature" + )] + fn resolve_path(&self, location: &Path) -> Result { + self.inner.path_to_filesystem(location) + } +} + +impl fmt::Debug for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IoUringObjectStore") + .field("root", &self.root) + .finish() + } +} + +impl fmt::Display for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "IoUringObjectStore({})", self.root.display()) + } +} + +// ============================================================ +// Linux: io_uring accelerated reads +// ============================================================ + +#[cfg(target_os = "linux")] +impl IoUringObjectStore { + async fn get_opts_uring( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + // Fetch metadata *and* enforce any conditional options (if_match, + // if_none_match, if_modified_since, if_unmodified_since, version) + // by delegating to the inner store with `head = true`. This returns + // an error if any precondition fails. + let head_opts = GetOptions { + head: true, + range: None, + ..options.clone() + }; + let meta = self.inner.get_opts(location, head_opts).await?.meta; + let file_size = meta.size; + + // Resolve the requested byte range + let range = match &options.range { + Some(r) => { + r.as_range(file_size) + .map_err(|e| object_store::Error::Generic { + store: "IoUringObjectStore", + source: Box::new(e), + })? + } + None => 0..file_size, + }; + + if range.start == range.end { + // Empty range — return an empty stream + let stream = futures::stream::once(async { Ok(Bytes::new()) }).boxed(); + return Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.clone(), + attributes: Attributes::new(), + }); + } + + let fs_path = self.resolve_path(location)?; + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.uring_sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: vec![range.clone()], + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + let mut results = rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })??; + + let bytes = results.remove(0); + let stream = futures::stream::once(async { Ok(bytes) }).boxed(); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.clone(), + attributes: Attributes::new(), + }) + } + + async fn get_ranges_uring( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + if ranges.is_empty() { + return Ok(vec![]); + } + + // Defensive: reject ranges with start > end (producers should never + // hand these in, but the io_uring path would compute a bogus buffer + // length via unsigned wrap-around and potentially OOM). + for r in ranges { + if r.start > r.end { + return Err(object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("invalid range: start {} > end {}", r.start, r.end) + .into(), + }); + } + } + + let fs_path = self.resolve_path(location)?; + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.uring_sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: ranges.to_vec(), + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })? + } +} + +// ============================================================ +// ObjectStore trait implementation +// ============================================================ + +#[async_trait] +impl ObjectStore for IoUringObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + // Head-only requests don't need io_uring + if options.head { + return self.inner.get_opts(location, options).await; + } + + #[cfg(target_os = "linux")] + { + return self.get_opts_uring(location, options).await; + } + + #[cfg(not(target_os = "linux"))] + { + self.inner.get_opts(location, options).await + } + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + #[cfg(target_os = "linux")] + { + return self.get_ranges_uring(location, ranges).await; + } + + #[cfg(not(target_os = "linux"))] + { + self.inner.get_ranges(location, ranges).await + } + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::ObjectStoreExt; + + /// Probe whether `io_uring_setup(2)` succeeds in the current sandbox. + /// Tests that actually drive the io_uring read path are skipped when + /// the syscall is blocked (seccomp profiles on many CI runners) — this + /// is test infrastructure, not a runtime fallback. + #[cfg(target_os = "linux")] + fn io_uring_available() -> bool { + io_uring::IoUring::new(2).is_ok() + } + #[cfg(not(target_os = "linux"))] + fn io_uring_available() -> bool { + true + } + + macro_rules! require_io_uring { + () => { + if !io_uring_available() { + eprintln!("skipping: io_uring_setup is blocked in this environment"); + return; + } + }; + } + + #[tokio::test] + async fn test_put_and_get() { + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("test/data.txt"); + let payload = PutPayload::from_static(b"hello io_uring"); + store.put(&path, payload).await.unwrap(); + + let result = store.get(&path).await.unwrap(); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"hello io_uring"); + } + + #[tokio::test] + async fn test_get_range() { + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("test/range.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let bytes = store.get_range(&path, 2..5).await.unwrap(); + assert_eq!(bytes.as_ref(), b"234"); + } + + #[tokio::test] + async fn test_get_ranges() { + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("test/ranges.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let ranges = vec![0..3, 5..8]; + let results = store.get_ranges(&path, &ranges).await.unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0].as_ref(), b"012"); + assert_eq!(results[1].as_ref(), b"567"); + } + + #[tokio::test] + async fn path_with_special_characters_round_trips() { + // Regression test for the path-resolution mismatch: the previous + // `self.root.join(location.as_ref())` concatenated the percent-encoded + // path, which didn't match the file that `LocalFileSystem` actually + // wrote. + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("a b/c d/file.txt"); + store + .put(&path, PutPayload::from_static(b"ok")) + .await + .unwrap(); + + let bytes = store.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"ok"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn many_concurrent_requests_are_pipelined() { + // Verifies that multiple `get_ranges` calls in flight simultaneously + // don't deadlock and all return the expected data — the previous + // single-request-at-a-time worker used to serialize them. + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new( + IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(), + ); + + let n_files: usize = 16; + let payload_len: usize = 64 * 1024; // 64 KiB per file + for i in 0..n_files { + let path = Path::from(format!("f{i}.bin")); + let buf = vec![i as u8; payload_len]; + store.put(&path, PutPayload::from(buf)).await.unwrap(); + } + + let mut tasks = futures::stream::FuturesUnordered::new(); + for i in 0..n_files { + let store = Arc::clone(&store); + tasks.push(async move { + let path = Path::from(format!("f{i}.bin")); + let ranges = (0..8) + .map(|k| { + let start = (k * 1024) as u64; + start..start + 512 + }) + .collect::>(); + let got = store.get_ranges(&path, &ranges).await.unwrap(); + assert_eq!(got.len(), 8); + for g in got { + assert_eq!(g.len(), 512); + assert!(g.iter().all(|b| *b == i as u8)); + } + }); + } + use futures::StreamExt as _; + while tasks.next().await.is_some() {} + } + + #[test] + fn invalid_prefix_returns_error_without_panicking() { + let err = + IoUringObjectStore::new_with_root("/definitely/does/not/exist/abcxyz".into()); + assert!(err.is_err(), "expected error for missing prefix"); + } + + #[tokio::test] + async fn test_head() { + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("test/head.txt"); + let payload = PutPayload::from_static(b"hello"); + store.put(&path, payload).await.unwrap(); + + let meta = store.head(&path).await.unwrap(); + assert_eq!(meta.size, 5); + } + + #[tokio::test] + async fn test_list() { + use futures::TryStreamExt; + + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path1 = Path::from("prefix/a.txt"); + let path2 = Path::from("prefix/b.txt"); + store + .put(&path1, PutPayload::from_static(b"a")) + .await + .unwrap(); + store + .put(&path2, PutPayload::from_static(b"b")) + .await + .unwrap(); + + let prefix = Path::from("prefix"); + let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap(); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn test_empty_ranges() { + require_io_uring!(); + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()).unwrap(); + + let path = Path::from("test/empty.txt"); + let payload = PutPayload::from_static(b"data"); + store.put(&path, payload).await.unwrap(); + + let results = store.get_ranges(&path, &[]).await.unwrap(); + assert!(results.is_empty()); + } +} diff --git a/datafusion/object-store-iouring/src/uring.rs b/datafusion/object-store-iouring/src/uring.rs new file mode 100644 index 0000000000000..85daa7d4ae32c --- /dev/null +++ b/datafusion/object-store-iouring/src/uring.rs @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dedicated io_uring worker thread. +//! +//! A single worker thread drains an unbounded mpsc of [`IoCommand`]s and +//! pipelines every request's reads through one shared [`IoUring`]. SQE +//! `user_data` packs `(slot_id << 32) | range_idx` so each completion finds +//! its owning request and range. Partial reads re-enqueue a follow-up SQE +//! for the remainder; the per-request [`File`] stays alive until every +//! completion for that slot has been drained. + +use std::collections::VecDeque; +use std::ops::Range; +use std::os::unix::io::AsRawFd; +use std::path::PathBuf; + +use bytes::Bytes; +use io_uring::{IoUring, opcode, types}; +use object_store::Result; +use tokio::sync::{mpsc, oneshot}; + +/// Number of submission-queue entries. 256 comfortably absorbs typical +/// Parquet column-chunk batches without spilling to the backlog. +const RING_ENTRIES: u32 = 256; + +/// Command sent from the async ObjectStore methods to the io_uring thread. +pub(crate) enum IoCommand { + ReadRanges { + path: PathBuf, + ranges: Vec>, + response: oneshot::Sender>>, + }, +} + +/// Per-request state living in the slab while SQEs are in flight. +/// +/// The `buffers[i]` [`Vec`] grows as completions arrive — `len()` is the +/// bytes filled so far, `capacity() >= target_len[i]` is the allocation. +/// This avoids the zero-fill cost of `vec![0u8; n]` without touching +/// uninitialized memory from safe code. +struct PendingRequest { + /// Kept alive so the raw fd stored in in-flight SQEs stays valid. + _file: std::fs::File, + fd: i32, + ranges: Vec>, + buffers: Vec>, + target_len: Vec, + /// Ranges that still need more bytes read. + ranges_remaining: u32, + /// SQEs still owed — either in the backlog or in flight. A slot may + /// only be freed when this hits zero; otherwise a late CQE could land + /// on a reallocated slot and corrupt an unrelated request. + sqes_outstanding: u32, + /// `None` after the response has been sent (success or error). + response: Option>>>, + errored: bool, +} + +#[inline] +fn pack_user_data(slot: u32, range_idx: u32) -> u64 { + ((slot as u64) << 32) | (range_idx as u64) +} + +#[inline] +fn unpack_user_data(user_data: u64) -> (u32, u32) { + ((user_data >> 32) as u32, user_data as u32) +} + +/// Slab of active requests. `slots[i]` is `Some` iff slot `i` is live. +struct Slab { + slots: Vec>, + free: Vec, + in_flight: usize, +} + +impl Slab { + fn new() -> Self { + Self { + slots: Vec::new(), + free: Vec::new(), + in_flight: 0, + } + } + + fn alloc(&mut self, req: PendingRequest) -> u32 { + self.in_flight += 1; + if let Some(id) = self.free.pop() { + self.slots[id as usize] = Some(req); + id + } else { + let id = self.slots.len() as u32; + self.slots.push(Some(req)); + id + } + } + + fn free(&mut self, id: u32) { + self.slots[id as usize] = None; + self.free.push(id); + self.in_flight -= 1; + } + + #[inline] + fn get_mut(&mut self, id: u32) -> Option<&mut PendingRequest> { + self.slots.get_mut(id as usize).and_then(|s| s.as_mut()) + } +} + +/// Main loop for the io_uring worker thread. +/// +/// Runs until the command channel is closed and all in-flight requests are +/// drained. +pub(crate) fn run_uring_loop(mut rx: mpsc::UnboundedReceiver) { + let mut ring = match IoUring::new(RING_ENTRIES) { + Ok(ring) => ring, + Err(e) => { + log::error!("Failed to create io_uring instance: {e}"); + drain_and_error(&mut rx, &e.to_string()); + return; + } + }; + + let mut slab = Slab::new(); + // SQEs waiting for room in the ring. `(slot, range_idx)`. + let mut backlog: VecDeque<(u32, u32)> = VecDeque::new(); + let mut rx_closed = false; + + loop { + // 1) Pull every command currently waiting. When nothing is in flight + // the thread blocks here so the worker idles without spinning. + while !rx_closed { + let cmd = if slab.in_flight == 0 && backlog.is_empty() { + match rx.blocking_recv() { + Some(cmd) => cmd, + None => { + rx_closed = true; + break; + } + } + } else { + match rx.try_recv() { + Ok(cmd) => cmd, + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => { + rx_closed = true; + break; + } + } + }; + accept_command(cmd, &mut slab, &mut backlog); + } + + // 2) Push as many backlog SQEs into the SQ as will fit. + push_backlog(&mut ring, &mut slab, &mut backlog); + + if slab.in_flight == 0 { + if rx_closed { + break; + } + continue; + } + + // 3) Single syscall that both submits the queued SQEs and waits for + // at least one completion. + if let Err(e) = ring.submit_and_wait(1) { + fail_all(&mut slab, &format!("io_uring submit failed: {e}")); + continue; + } + + // 4) Drain every available CQE; short reads re-queue a follow-up SQE. + drain_completions(&mut ring, &mut slab, &mut backlog); + } +} + +/// Respond to every queued command with `reason` after init failure. +fn drain_and_error(rx: &mut mpsc::UnboundedReceiver, reason: &str) { + while let Some(cmd) = rx.blocking_recv() { + let IoCommand::ReadRanges { response, .. } = cmd; + let _ = response.send(Err(object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("io_uring init failed: {reason}").into(), + })); + } +} + +/// Allocate a slab slot for the incoming command and queue one SQE per range. +/// Errors (missing file, 32-bit overflow) are reported immediately without +/// occupying a slot. +fn accept_command(cmd: IoCommand, slab: &mut Slab, backlog: &mut VecDeque<(u32, u32)>) { + let IoCommand::ReadRanges { + path, + ranges, + response, + } = cmd; + + if ranges.is_empty() { + let _ = response.send(Ok(Vec::new())); + return; + } + + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) => { + let err = if e.kind() == std::io::ErrorKind::NotFound { + object_store::Error::NotFound { + path: path.display().to_string(), + source: e.into(), + } + } else { + object_store::Error::Generic { + store: "IoUringObjectStore", + source: e.into(), + } + }; + let _ = response.send(Err(err)); + return; + } + }; + + let n = ranges.len(); + let mut buffers: Vec> = Vec::with_capacity(n); + let mut target_len: Vec = Vec::with_capacity(n); + for r in &ranges { + let len = r.end.saturating_sub(r.start); + let Ok(len) = usize::try_from(len) else { + let _ = response.send(Err(object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("range length {len} exceeds usize").into(), + })); + return; + }; + // Preallocate the full range without zero-filling. Each buffer + // grows via `set_len` as kernel completions arrive, so the Vec + // always reflects exactly the bytes initialized so far. + buffers.push(Vec::with_capacity(len)); + target_len.push(len); + } + + let fd = file.as_raw_fd(); + let n_u32 = n as u32; + let slot = slab.alloc(PendingRequest { + _file: file, + fd, + ranges, + buffers, + target_len, + ranges_remaining: n_u32, + sqes_outstanding: n_u32, + response: Some(response), + errored: false, + }); + + backlog.reserve(n); + for idx in 0..n_u32 { + backlog.push_back((slot, idx)); + } +} + +/// Push backlog SQEs into the submission queue until either the backlog or +/// the SQ is exhausted. +fn push_backlog(ring: &mut IoUring, slab: &mut Slab, backlog: &mut VecDeque<(u32, u32)>) { + if backlog.is_empty() { + return; + } + + // SAFETY: every `buffers[idx]` lives inside the slab slot, which is + // not freed until the matching completion is processed — so the raw + // pointer and length remain valid for the lifetime of every SQE. + unsafe { + let mut sq = ring.submission(); + while let Some(&(slot, range_idx)) = backlog.front() { + let Some(req) = slab.get_mut(slot) else { + // Slot was freed early (error path) — drop the stale SQE. + backlog.pop_front(); + continue; + }; + let idx = range_idx as usize; + let filled = req.buffers[idx].len(); + let remaining = req.target_len[idx] - filled; + debug_assert!(remaining > 0, "queued SQE for already-full range"); + + let ptr = req.buffers[idx].as_mut_ptr().add(filled); + let entry = opcode::Read::new(types::Fd(req.fd), ptr, remaining as u32) + .offset(req.ranges[idx].start + filled as u64) + .build() + .user_data(pack_user_data(slot, range_idx)); + + if sq.push(&entry).is_err() { + break; + } + backlog.pop_front(); + } + sq.sync(); + } +} + +/// Consume every available CQE and route it back to its owning request. +/// Short reads re-queue a follow-up SQE onto the backlog. +fn drain_completions( + ring: &mut IoUring, + slab: &mut Slab, + backlog: &mut VecDeque<(u32, u32)>, +) { + let cq = ring.completion(); + for cqe in cq { + let (slot, range_idx) = unpack_user_data(cqe.user_data()); + let idx = range_idx as usize; + + let Some(req) = slab.get_mut(slot) else { + // Slot freed — the request already terminated and every + // outstanding SQE was accounted for. + continue; + }; + + debug_assert!(req.sqes_outstanding > 0); + req.sqes_outstanding -= 1; + + if req.errored || req.response.is_none() { + maybe_free(slab, slot); + continue; + } + + if idx >= req.buffers.len() { + let err = object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!( + "io_uring cqe with invalid user_data ({slot}, {range_idx})" + ) + .into(), + }; + finish_with_error(slab, slot, err); + continue; + } + + let ret = cqe.result(); + if ret < 0 { + let err = object_store::Error::Generic { + store: "IoUringObjectStore", + source: std::io::Error::from_raw_os_error(-ret).into(), + }; + finish_with_error(slab, slot, err); + continue; + } + + let bytes_read = ret as usize; + if bytes_read == 0 { + let range = req.ranges[idx].clone(); + let err = object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("unexpected EOF reading {}..{}", range.start, range.end) + .into(), + }; + finish_with_error(slab, slot, err); + continue; + } + + // SAFETY: the kernel just wrote `bytes_read` bytes starting at + // `buf.as_mut_ptr().add(buf.len())` (see `push_backlog`), so + // advancing `len` by that amount exposes only initialized bytes. + // `capacity == target_len[idx] >= new_len` since we never queue + // more than `target_len - filled` for a single SQE. + let buf = &mut req.buffers[idx]; + let new_len = buf.len() + bytes_read; + debug_assert!(new_len <= req.target_len[idx]); + unsafe { + buf.set_len(new_len); + } + + if new_len < req.target_len[idx] { + // Partial read — queue a follow-up for the remainder. + backlog.push_back((slot, range_idx)); + req.sqes_outstanding += 1; + continue; + } + + req.ranges_remaining -= 1; + if req.ranges_remaining == 0 { + finish_ok(slab, slot); + } + } +} + +fn finish_ok(slab: &mut Slab, slot: u32) { + let Some(req) = slab.slots[slot as usize].as_mut() else { + return; + }; + let buffers = std::mem::take(&mut req.buffers); + if let Some(tx) = req.response.take() { + let bytes: Vec = buffers.into_iter().map(Bytes::from).collect(); + let _ = tx.send(Ok(bytes)); + } + maybe_free(slab, slot); +} + +fn finish_with_error(slab: &mut Slab, slot: u32, err: object_store::Error) { + let Some(req) = slab.slots[slot as usize].as_mut() else { + return; + }; + req.errored = true; + if let Some(tx) = req.response.take() { + let _ = tx.send(Err(err)); + } + maybe_free(slab, slot); +} + +/// Free the slot iff no SQEs remain in the backlog or in flight. +fn maybe_free(slab: &mut Slab, slot: u32) { + let Some(req) = slab.slots[slot as usize].as_ref() else { + return; + }; + if req.response.is_none() && req.sqes_outstanding == 0 { + slab.free(slot); + } +} + +fn fail_all(slab: &mut Slab, msg: &str) { + for slot in 0..slab.slots.len() { + if slab.slots[slot].is_some() { + let err = object_store::Error::Generic { + store: "IoUringObjectStore", + source: msg.to_string().into(), + }; + finish_with_error(slab, slot as u32, err); + } + } +}