Skip to content
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ members = [
"benchmarks",
"datafusion/macros",
"datafusion/doc",
"datafusion/object-store-iouring",
]
exclude = ["dev/depcheck"]
resolver = "2"
Expand Down Expand Up @@ -140,6 +141,7 @@ datafusion-functions-table = { path = "datafusion/functions-table", version = "5
datafusion-functions-window = { path = "datafusion/functions-window", version = "53.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "53.0.0" }
datafusion-macros = { path = "datafusion/macros", version = "53.0.0" }
datafusion-object-store-iouring = { path = "datafusion/object-store-iouring", version = "53.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "53.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "53.0.0", default-features = false }
datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "53.0.0", default-features = false }
Expand All @@ -153,7 +155,6 @@ datafusion-session = { path = "datafusion/session", version = "53.0.0" }
datafusion-spark = { path = "datafusion/spark", version = "53.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "53.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "53.0.0" }

doc-comment = "0.3"
env_logger = "0.11"
flate2 = "1.1.9"
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ default = [
"parquet",
"recursive_protection",
"sql",
"io-uring",
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
io-uring = ["datafusion-execution/io-uring"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
math_expressions = ["datafusion-functions/math_expressions"]
Expand Down
4 changes: 3 additions & 1 deletion datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ workspace = true
name = "datafusion_execution"

[features]
default = ["sql"]
default = ["sql", "io-uring"]

parquet_encryption = [
"parquet/encryption",
Expand All @@ -50,6 +50,7 @@ arrow_buffer_pool = [
"arrow-buffer/pool",
]
sql = []
io-uring = ["datafusion-object-store-iouring"]

[dependencies]
arrow = { workspace = true }
Expand All @@ -58,6 +59,7 @@ async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = false }
datafusion-expr = { workspace = true, default-features = false }
datafusion-object-store-iouring = { workspace = true, optional = true }
datafusion-physical-expr-common = { workspace = true, default-features = false }
futures = { workspace = true }
log = { workspace = true }
Expand Down
27 changes: 23 additions & 4 deletions datafusion/execution/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_common::{
DataFusionError, Result, exec_err, internal_datafusion_err, not_impl_err,
};
use object_store::ObjectStore;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), not(feature = "io-uring")))]
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use url::Url;
Expand Down Expand Up @@ -205,11 +205,30 @@ impl Default for DefaultObjectStoreRegistry {
}

impl DefaultObjectStoreRegistry {
/// This will register [`LocalFileSystem`] to handle `file://` paths
/// Registers a local filesystem object store to handle `file://` paths.
///
/// When the `io-uring` feature is enabled, registers an
/// `IoUringObjectStore`, which uses io_uring for batched local
/// file reads.
///
/// Panics if the `io-uring` feature is enabled but the io_uring worker
/// thread cannot be created — this typically indicates a misconfigured
/// sandbox / seccomp profile and is a deployment bug rather than
/// something to silently paper over.
#[cfg(not(target_arch = "wasm32"))]
pub fn new() -> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));

#[cfg(feature = "io-uring")]
let local: Arc<dyn ObjectStore> = Arc::new(
datafusion_object_store_iouring::IoUringObjectStore::new()
.expect("failed to initialize IoUringObjectStore"),
);

#[cfg(not(feature = "io-uring"))]
let local: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());

object_stores.insert("file://".to_string(), local);
Self { object_stores }
}

Expand All @@ -223,7 +242,7 @@ impl DefaultObjectStoreRegistry {

///
/// Stores are registered based on the scheme, host and port of the provided URL
/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the
/// with a local filesystem store automatically registered for `file://` (if the
/// target arch is not `wasm32`).
///
/// For example:
Expand Down
50 changes: 50 additions & 0 deletions datafusion/object-store-iouring/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-object-store-iouring"
description = "io-uring based ObjectStore for DataFusion local file I/O"
keywords = ["arrow", "query", "sql", "io-uring"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[lib]
name = "datafusion_object_store_iouring"

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
tokio = { workspace = true, features = ["sync"] }

[target.'cfg(target_os = "linux")'.dependencies]
io-uring = "0.7"

[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Loading
Loading