From 1b1b5865df935db830c4c6785e882b2e78731bc6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Apr 2026 13:42:21 -0400 Subject: [PATCH 1/5] Add Dynamic work scheduling in FileStream --- Cargo.lock | 1 + datafusion/datasource/Cargo.toml | 1 + .../datasource/src/file_scan_config/mod.rs | 136 +++- .../datasource/src/file_stream/builder.rs | 21 +- datafusion/datasource/src/file_stream/mod.rs | 593 ++++++++++++++++-- .../datasource/src/file_stream/scan_state.rs | 24 +- .../datasource/src/file_stream/work_source.rs | 102 +++ datafusion/datasource/src/morsel/mocks.rs | 12 +- datafusion/datasource/src/source.rs | 92 ++- datafusion/datasource/src/test_util.rs | 12 +- 10 files changed, 920 insertions(+), 74 deletions(-) create mode 100644 datafusion/datasource/src/file_stream/work_source.rs diff --git a/Cargo.lock b/Cargo.lock index a67cffe08a5ca..3fea730f283ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1969,6 +1969,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "tokio", diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 4027521658977..40e2271f45205 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -64,6 +64,7 @@ itertools = { workspace = true } liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index f4e4e0a4dec0d..e75476f6ee551 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, - source::DataSource, statistics::MinMaxStatistics, + file_stream::work_source::SharedWorkSource, source::DataSource, + statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; @@ -38,6 +39,7 @@ use datafusion_execution::{ }; use datafusion_expr::Operator; +use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { + self.open_with_args(OpenArgs::new(partition, context)) + } + + fn open_with_args(&self, args: OpenArgs) -> Result { + let OpenArgs { + partition, + context, + sibling_state, + } = args; let object_store = context.runtime_env().object_store(&self.object_store_url)?; let batch_size = self .batch_size @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig { let morselizer = source.create_morselizer(object_store, self, partition)?; + // Extract the shared work source from the sibling state if it exists. + // This allows multiple sibling streams to steal work from a single + // shared queue of unopened files. + let shared_work_source = sibling_state + .as_ref() + .and_then(|state| state.downcast_ref::()) + .cloned(); + let stream = FileStreamBuilder::new(self) .with_partition(partition) + .with_shared_work_source(shared_work_source) .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig { // Delegate to the file source self.file_source.apply_expressions(f) } + + /// Create any shared state that should be passed between sibling streams + /// during one execution. + /// + /// This returns `None` when sibling streams must not share work, such as + /// when file order must be preserved or the file groups define the output + /// partitioning needed for the rest of the plan + fn create_sibling_state(&self) -> Option> { + if self.preserve_order || self.partitioned_by_file_group { + return None; + } + + Some(Arc::new(SharedWorkSource::from_config(self)) as Arc) + } } impl FileScanConfig { @@ -1368,19 +1402,33 @@ mod tests { use super::*; use crate::TableSchema; + use crate::source::DataSourceExec; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; + use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; + use datafusion_common::tree_node::TreeNodeRecursion; + use datafusion_common::{Result, assert_batches_eq, internal_err}; + use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; + use datafusion_physical_expr::projection::ProjectionExprs; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::execution_plan::collect; + use futures::FutureExt as _; + use futures::StreamExt as _; + use futures::stream; + use object_store::ObjectStore; + use std::fmt::Debug; #[derive(Clone)] struct InexactSortPushdownSource { @@ -1400,7 +1448,7 @@ mod tests { impl FileSource for InexactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { @@ -2288,6 +2336,88 @@ mod tests { assert_eq!(partition_stats.total_byte_size, Precision::Exact(800)); } + /// Regression test for reusing a `DataSourceExec` after its execution-local + /// shared work queue has been drained. + /// + /// This test uses a single file group with two files so the scan creates a + /// shared unopened-file queue. Executing after `reset_state` must recreate + /// the shared queue and return the same rows again. + #[tokio::test] + async fn reset_state_recreates_shared_work_source() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let file_source = Arc::new( + MockSource::new(Arc::clone(&schema)) + .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })), + ); + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_group(FileGroup::new(vec![ + PartitionedFile::new("file1.parquet", 100), + PartitionedFile::new("file2.parquet", 100), + ])) + .build(); + + let exec: Arc = DataSourceExec::from_data_source(config); + let task_ctx = Arc::new(TaskContext::default()); + + // Running the same scan after resetting the state, should + // produce the same answer. + let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?; + let reset_exec = exec.reset_state()?; + let second_run = collect(reset_exec, task_ctx).await?; + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| 1 |", + "| 2 |", + "+-------+", + ]; + assert_batches_eq!(expected, &first_run); + assert_batches_eq!(expected, &second_run); + + Ok(()) + } + + /// Test-only `FileOpener` that turns file names like `file1.parquet` into a + /// single-batch stream containing that numeric value + #[derive(Debug)] + struct ResetStateTestFileOpener { + schema: SchemaRef, + } + + impl crate::file_stream::FileOpener for ResetStateTestFileOpener { + fn open( + &self, + file: PartitionedFile, + ) -> Result { + let value = file + .object_meta + .location + .as_ref() + .trim_start_matches("file") + .trim_end_matches(".parquet") + .parse::() + .expect("invalid test file name"); + let schema = Arc::clone(&self.schema); + Ok(async move { + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![value]))], + ) + .expect("test batch should be valid"); + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } + .boxed()) + } + } + #[test] fn test_output_partitioning_not_partitioned_by_file_group() { let file_schema = aggr_test_schema(); @@ -2476,7 +2606,7 @@ mod tests { impl FileSource for ExactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index efe9c39ce3b38..7034e902550a9 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; use crate::file_stream::scan_state::ScanState; +use crate::file_stream::work_source::{SharedWorkSource, WorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -33,10 +34,11 @@ pub struct FileStreamBuilder<'a> { morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, + shared_work_source: Option, } impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. + /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { Self { config, @@ -44,6 +46,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer: None, metrics: None, on_error: OnError::Fail, + shared_work_source: None, } } @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> { self } + /// Configure the [`SharedWorkSource`] for sibling work stealing. + pub(crate) fn with_shared_work_source( + mut self, + shared_work_source: Option, + ) -> Self { + self.shared_work_source = shared_work_source; + self + } + /// Build the configured [`FileStream`]. pub fn build(self) -> Result { let Self { @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer, metrics, on_error, + shared_work_source, } = self; let Some(partition) = partition else { @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let work_source = match shared_work_source { + Some(shared) => WorkSource::Shared(shared), + None => WorkSource::Local(file_group.into_inner().into()), + }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); let scan_state = Box::new(ScanState::new( - file_group.into_inner(), + work_source, config.limit, morselizer, on_error, diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index ff71f16023080..b9aabacf64c11 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -24,6 +24,7 @@ mod builder; mod metrics; mod scan_state; +pub(crate) mod work_source; use std::pin::Pin; use std::sync::Arc; @@ -175,6 +176,7 @@ mod tests { IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId, PendingPlannerBuilder, PollsToResolve, }; + use crate::source::DataSource; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; use arrow::array::{AsArray, RecordBatch}; @@ -184,14 +186,22 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{FutureExt as _, StreamExt as _}; + use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + work_source::SharedWorkSource, + }; use crate::test_util::MockSource; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; + /// Test identifier for one `FileStream` partition. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct PartitionId(usize); + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -758,8 +768,8 @@ mod tests { async fn morsel_two_ios_one_batch() -> Result<()> { let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") - .add_plan(PendingPlannerBuilder::new(IoFutureId(1)).build()) - .add_plan(PendingPlannerBuilder::new(IoFutureId(2)).build()) + .add_plan(PendingPlannerBuilder::new(IoFutureId(1))) + .add_plan(PendingPlannerBuilder::new(IoFutureId(2))) .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) .return_none(), ); @@ -871,8 +881,7 @@ mod tests { async fn morsel_ready_child_planner() -> Result<()> { let child_planner = MockPlanner::builder("child planner") .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) - .return_none() - .build(); + .return_none(); let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") @@ -1001,11 +1010,265 @@ mod tests { Ok(()) } - /// Tests how FileStream opens and processes files. + /// Return a morsel test with two partitions: + /// Partition 0: file1, file2, file3 + /// Partition 1: file4 + /// + /// Partition 1 has only 1 file but it polled first 4 times + fn two_partition_morsel_test() -> FileStreamMorselTest { + FileStreamMorselTest::new() + // Partition 0 has three files + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 1 has only one file, but is polled first + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file4.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201)) + .return_none(), + ) + .with_reads(vec![ + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + ]) + } + + /// Verifies that an idle sibling stream can steal shared files from + /// another stream once it exhausts its own local work. + #[tokio::test] + async fn morsel_shared_files_can_be_stolen() -> Result<()> { + let test = two_partition_morsel_test().with_file_stream_events(false); + + // Partition 0 starts with 3 files, but Partition 1 is polled first. + // Since Partition is polled first, it will run all the files even those + // that were assigned to Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a stream that must preserve order keeps its files local + /// and therefore cannot steal from a sibling shared queue. + #[tokio::test] + async fn morsel_preserve_order_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-order + let test = two_partition_morsel_test() + .with_preserve_order(true) + .with_file_stream_events(false); + + // Even though that Partition 1 is polled first, it can not steal files + // from partition 0. The three files originally assigned to Partition 0 + // must be evaluated by Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that `partitioned_by_file_group` disables shared work stealing. + #[tokio::test] + async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-partitioned + let test = two_partition_morsel_test() + .with_partitioned_by_file_group(true) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that an empty sibling can immediately steal shared files when + /// it is polled before the stream that originally owned them. + #[tokio::test] + async fn morsel_empty_sibling_can_steal() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Poll the empty sibling first so it steals both files. + .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Ensures that if a sibling is built and polled + /// before another sibling has been built and contributed its files to the + /// shared queue, the first sibling does not finish prematurely. + #[tokio::test] + async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Build streams lazily so partition 1 can poll the shared queue + // before partition 0 has contributed its files. Once partition 0 + // is built, a later poll of partition 1 can still steal one of + // them from the shared queue. + .with_build_streams_on_first_read(true) + .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)]) + .with_file_stream_events(false); + + // Partition 1 polls too early once, then later steals one file after + // partition 0 has populated the shared queue. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 102 + Done + ----- Partition 1 ----- + Batch: 101 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that one fast sibling can drain shared files that originated + /// in more than one other partition. + #[tokio::test] + async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + // Partition 1 has two files + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 2 starts empty but is polled first, so it should drain + // the shared queue across both sibling partitions. + .with_reads(vec![ + PartitionId(2), + PartitionId(2), + PartitionId(1), + PartitionId(2), + ]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 103 + Done + ----- Partition 2 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { morselizer: MockMorselizer, - file_names: Vec, + partition_files: BTreeMap>, + preserve_order: bool, + partitioned_by_file_group: bool, + file_stream_events: bool, + build_streams_on_first_read: bool, + reads: Vec, limit: Option, } @@ -1014,75 +1277,238 @@ mod tests { fn new() -> Self { Self { morselizer: MockMorselizer::new(), - file_names: vec![], + partition_files: BTreeMap::new(), + preserve_order: false, + partitioned_by_file_group: false, + file_stream_events: true, + build_streams_on_first_read: false, + reads: vec![], limit: None, } } - /// Adds one file and its root planner to the test input. - fn with_file(mut self, planner: impl Into) -> Self { + /// Adds one file and its root planner to partition 0. + fn with_file(self, planner: impl Into) -> Self { + self.with_file_in_partition(PartitionId(0), planner) + } + + /// Adds one file and its root planner to the specified input partition. + fn with_file_in_partition( + mut self, + partition: PartitionId, + planner: impl Into, + ) -> Self { let planner = planner.into(); - self.file_names.push(planner.file_path().to_string()); - self.morselizer = self.morselizer.with_file(planner); + let file_path = planner.file_path().to_string(); + self.morselizer = self.morselizer.with_planner(planner); + self.partition_files + .entry(partition) + .or_default() + .push(file_path); self } - /// Sets a global output limit for the stream. + /// Marks the stream (and all partitions) to preserve the specified file + /// order. + fn with_preserve_order(mut self, preserve_order: bool) -> Self { + self.preserve_order = preserve_order; + self + } + + /// Marks the test scan as pre-partitioned by file group, which should + /// force each stream to keep its own files local. + fn with_partitioned_by_file_group( + mut self, + partitioned_by_file_group: bool, + ) -> Self { + self.partitioned_by_file_group = partitioned_by_file_group; + self + } + + /// Controls whether scheduler events are included in the snapshot. + /// + /// When disabled, `run()` still includes the event section header but + /// replaces the trace with a fixed placeholder so tests can focus only + /// on the output batches. + fn with_file_stream_events(mut self, file_stream_events: bool) -> Self { + self.file_stream_events = file_stream_events; + self + } + + /// Controls whether streams are all built up front or lazily on their + /// first read. + /// + /// The default builds all streams before polling begins, which matches + /// normal execution. Tests may enable lazy creation to model races + /// where one sibling polls before another has contributed its files to + /// the shared queue. + fn with_build_streams_on_first_read( + mut self, + build_streams_on_first_read: bool, + ) -> Self { + self.build_streams_on_first_read = build_streams_on_first_read; + self + } + + /// Sets the partition polling order. + /// + /// `run()` polls these partitions in the listed order first. After + /// those explicit reads are exhausted, it completes to round + /// robin across all configured partitions, skipping any streams that + /// have already finished. + /// + /// This allows testing early scheduling decisions explicit in a test + /// while avoiding a fully scripted poll trace for the remainder. + fn with_reads(mut self, reads: Vec) -> Self { + self.reads = reads; + self + } + + /// Sets a global output limit for all streams created by this test. fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } - /// Runs the test returns combined output and scheduler trace text as a String. + /// Runs the test and returns combined stream output and scheduler + /// trace text. async fn run(self) -> Result { let observer = self.morselizer.observer().clone(); observer.clear(); - let config = self.test_config(); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut stream = FileStreamBuilder::new(&config) - .with_partition(0) - .with_morselizer(Box::new(self.morselizer)) - .with_metrics(&metrics_set) - .build()?; + let partition_count = self.num_partitions(); - let mut stream_contents = Vec::new(); - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let col = batch.column(0).as_primitive::(); - let batch_id = col.value(0); - stream_contents.push(format!("Batch: {batch_id}")); - } - Err(e) => { - // Pull the actual message for external errors rather than - // relying on DataFusionError formatting, which changes - // if backtraces are enabled, etc - let message = if let DataFusionError::External(generic) = e { - generic.to_string() - } else { - e.to_string() - }; - stream_contents.push(format!("Error: {message}")); - } + let mut partitions = (0..partition_count) + .map(|_| PartitionState::new()) + .collect::>(); + + let mut build_order = Vec::new(); + for partition in self.reads.iter().map(|partition| partition.0) { + if !build_order.contains(&partition) { + build_order.push(partition); + } + } + for partition in 0..partition_count { + if !build_order.contains(&partition) { + build_order.push(partition); } } - stream_contents.push("Done".to_string()); - Ok(format!( - "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", - stream_contents.join("\n"), + let config = self.test_config(); + // `DataSourceExec::execute` creates one execution-local shared + // state object via `create_sibling_state()` and then passes it + // to `open_with_sibling_state(...)`. These tests build + // `FileStream`s directly, bypassing `DataSourceExec`, so they must + // perform the same setup explicitly when exercising sibling-stream + // work stealing. + let shared_work_source = config.create_sibling_state().and_then(|state| { + state.as_ref().downcast_ref::().cloned() + }); + if !self.build_streams_on_first_read { + for partition in build_order { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partitions[partition].set_stream(stream); + } + } + + let mut initial_reads: VecDeque<_> = self.reads.into(); + let mut next_round_robin = 0; + + while !initial_reads.is_empty() + || partitions.iter().any(PartitionState::is_active) + { + let partition = if let Some(partition) = initial_reads.pop_front() { + partition.0 + } else { + let partition = next_round_robin; + next_round_robin = (next_round_robin + 1) % partition_count.max(1); + partition + }; + + let partition_state = &mut partitions[partition]; + + if self.build_streams_on_first_read && !partition_state.built { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partition_state.set_stream(stream); + } + + let Some(stream) = partition_state.stream.as_mut() else { + continue; + }; + + match stream.next().await { + Some(result) => partition_state.push_output(format_result(result)), + None => partition_state.finish(), + } + } + + let output_text = if partition_count == 1 { + format!( + "----- Output Stream -----\n{}", + partitions[0].output.join("\n") + ) + } else { + partitions + .into_iter() + .enumerate() + .map(|(partition, state)| { + format!( + "----- Partition {} -----\n{}", + partition, + state.output.join("\n") + ) + }) + .collect::>() + .join("\n") + }; + + let file_stream_events = if self.file_stream_events { observer.format_events() + } else { + "(omitted due to with_file_stream_events(false))".to_string() + }; + + Ok(format!( + "{output_text}\n----- File Stream Events -----\n{file_stream_events}", )) } - /// Builds the `FileScanConfig` for the configured mock file set. + /// Returns the number of configured partitions, including empty ones + /// that appear only in the explicit read schedule. + fn num_partitions(&self) -> usize { + self.partition_files + .keys() + .map(|partition| partition.0 + 1) + .chain(self.reads.iter().map(|partition| partition.0 + 1)) + .max() + .unwrap_or(1) + } + + /// Builds a `FileScanConfig` covering every configured partition. fn test_config(&self) -> FileScanConfig { - let file_group = self - .file_names - .iter() - .map(|name| PartitionedFile::new(name, 10)) - .collect(); + let file_groups = (0..self.num_partitions()) + .map(|partition| { + self.partition_files + .get(&PartitionId(partition)) + .into_iter() + .flat_map(|files| files.iter()) + .map(|name| PartitionedFile::new(name, 10)) + .collect::>() + .into() + }) + .collect::>(); + let table_schema = TableSchema::new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![], @@ -1091,9 +1517,76 @@ mod tests { ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), ) - .with_file_group(file_group) + .with_file_groups(file_groups) .with_limit(self.limit) + .with_preserve_order(self.preserve_order) + .with_partitioned_by_file_group(self.partitioned_by_file_group) .build() } } + + /// Formats one stream poll result into a stable snapshot line. + fn format_result(result: Result) -> String { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + format!("Batch: {batch_id}") + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes if + // backtraces are enabled, etc. + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + format!("Error: {message}") + } + } + } + + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. + struct PartitionState { + /// Whether the `FileStream` for this partition has been built yet. + built: bool, + /// The live stream, if this partition has not finished yet. + stream: Option, + /// Snapshot lines produced by this partition. + output: Vec, + } + + impl PartitionState { + /// Create an unbuilt partition with no output yet. + fn new() -> Self { + Self { + built: false, + stream: None, + output: vec![], + } + } + + /// Returns true if this partition might still produce output. + fn is_active(&self) -> bool { + !self.built || self.stream.is_some() + } + + /// Records that this partition's stream has been built. + fn set_stream(&mut self, stream: FileStream) { + self.stream = Some(stream); + self.built = true; + } + + /// Records one formatted output line for this partition. + fn push_output(&mut self, line: String) { + self.output.push(line); + } + + /// Marks this partition as finished. + fn finish(&mut self) { + self.push_output("Done".to_string()); + self.stream = None; + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 025164c29c8f2..fdae1bcf7e074 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -19,7 +19,6 @@ use datafusion_common::internal_datafusion_err; use std::collections::VecDeque; use std::task::{Context, Poll}; -use crate::PartitionedFile; use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; @@ -27,6 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; +use super::work_source::WorkSource; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -45,7 +45,7 @@ use super::{FileStreamMetrics, OnError}; /// # State Transitions /// /// ```text -/// file_iter +/// work_source /// | /// v /// morselizer.plan_file(file) @@ -62,8 +62,8 @@ use super::{FileStreamMetrics, OnError}; /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { - /// Files that still need to be planned. - file_iter: VecDeque, + /// Unopened files that still need to be planned for this stream. + work_source: WorkSource, /// Remaining row limit, if any. remain: Option, /// The morselizer used to plan files. @@ -76,7 +76,10 @@ pub(super) struct ScanState { ready_morsels: VecDeque>, /// The active reader, if any. reader: Option>>, - /// The currently outstanding I/O, if any. + /// The single planner currently blocked on I/O, if any. + /// + /// Once the I/O completes, yields the next planner and is pushed back + /// onto `ready_planners`. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -84,15 +87,14 @@ pub(super) struct ScanState { impl ScanState { pub(super) fn new( - file_iter: impl Into>, + work_source: WorkSource, remain: Option, morselizer: Box, on_error: OnError, metrics: FileStreamMetrics, ) -> Self { - let file_iter = file_iter.into(); Self { - file_iter, + work_source, remain, morselizer, on_error, @@ -170,7 +172,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.file_iter.len(); + let done = 1 + self.work_source.len(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) @@ -263,8 +265,8 @@ impl ScanState { }; } - // No outstanding work remains, so morselize the next unopened file. - let part_file = match self.file_iter.pop_front() { + // No outstanding work remains, so begin planning the next unopened file. + let part_file = match self.work_source.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), }; diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs new file mode 100644 index 0000000000000..1dcb6082c47c7 --- /dev/null +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::PartitionedFile; +use crate::file_groups::FileGroup; +use crate::file_scan_config::FileScanConfig; +use parking_lot::Mutex; + +/// Source of work for `ScanState`. +/// +/// Streams that may share work across siblings use [`WorkSource::Shared`], +/// while streams that can not share work (e.g. because they must preserve file +/// order) use [`WorkSource::Local`]. +#[derive(Debug, Clone)] +pub(super) enum WorkSource { + /// Files this stream will plan locally without sharing them. + Local(VecDeque), + /// Files shared with sibling streams. + Shared(SharedWorkSource), +} + +impl WorkSource { + /// Pop the next file to plan from this work source. + pub(super) fn pop_front(&mut self) -> Option { + match self { + Self::Local(files) => files.pop_front(), + Self::Shared(shared) => shared.pop_front(), + } + } + + /// Return the number of files that are still waiting to be planned. + pub(super) fn len(&self) -> usize { + match self { + Self::Local(files) => files.len(), + Self::Shared(shared) => shared.len(), + } + } +} + +/// Shared source of work for sibling `FileStream`s +/// +/// The queue is created once per execution and shared by all reorderable +/// sibling streams for that execution. Whichever stream becomes idle first may +/// take the next unopened file from the front of the queue. +/// +/// It uses a [`Mutex`] internally to provide thread-safe access +/// to the shared file queue. +#[derive(Debug, Clone)] +pub(crate) struct SharedWorkSource { + inner: Arc, +} + +#[derive(Debug, Default)] +pub(super) struct SharedWorkSourceInner { + files: Mutex>, +} + +impl SharedWorkSource { + /// Create a shared work source containing the provided unopened files. + pub(crate) fn new(files: impl IntoIterator) -> Self { + let files = files.into_iter().collect(); + Self { + inner: Arc::new(SharedWorkSourceInner { + files: Mutex::new(files), + }), + } + } + + /// Create a shared work source for the unopened files in `config`. + pub(crate) fn from_config(config: &FileScanConfig) -> Self { + Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) + } + + /// Pop the next file from the shared work queue. + /// + /// Returns `None` if the queue is empty + fn pop_front(&self) -> Option { + self.inner.files.lock().pop_front() + } + + /// Return the number of files still waiting in the shared queue. + fn len(&self) -> usize { + self.inner.files.lock().len() + } +} diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs index cd1fa3732ffea..ceb0e720691a7 100644 --- a/datafusion/datasource/src/morsel/mocks.rs +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -295,8 +295,11 @@ impl MockPlanBuilder { } /// Add a ready child planner - pub(crate) fn with_ready_planner(self, ready_planners: MockPlanner) -> Self { - self.with_ready_planners(vec![ready_planners]) + pub(crate) fn with_ready_planner( + self, + ready_planner: impl Into, + ) -> Self { + self.with_ready_planners(vec![ready_planner.into()]) } /// Add ready child planners produced by this planning step. @@ -430,8 +433,9 @@ impl MockMorselizer { &self.observer } - /// Associates a file path with the planner spec used to open it. - pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + /// Specify the return planner for the specified file_path + pub(crate) fn with_planner(mut self, planner: impl Into) -> Self { + let planner = planner.into(); self.files.insert(planner.file_path.clone(), planner); self } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 81e15d0a2a092..afed439788463 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_plan::execution_plan::{ @@ -123,12 +123,23 @@ use datafusion_physical_plan::filter_pushdown::{ /// └─────────────────────┘ /// ``` pub trait DataSource: Send + Sync + Debug { + /// Open the specified output partition and return its stream of + /// [`RecordBatch`]es. + /// + /// This should be used by data sources that do not need any sibling + /// coordination. Data sources that want to use per-execution shared state + /// (for example, to reorder work across partitions at runtime) should + /// implement [`Self::open_with_args`] instead. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch fn open( &self, partition: usize, context: Arc, ) -> Result; + fn as_any(&self) -> &dyn Any; + /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -246,6 +257,55 @@ pub trait DataSource: Send + Sync + Debug { ) -> Option> { None } + + /// Create per execution state to share across sibling instances of this + /// data source during one execution. + /// + /// Returns `None` (the default) if this data source has + /// no sibling-shared execution state. + fn create_sibling_state(&self) -> Option> { + None + } + + /// Open a partition using optional sibling-shared execution state. + /// + /// The default implementation ignores the additional state and delegates to + /// [`Self::open`]. + fn open_with_args(&self, args: OpenArgs) -> Result { + self.open(args.partition, args.context) + } +} + +/// Arguments for [`DataSource::open_with_args`] +#[derive(Debug, Clone)] +pub struct OpenArgs { + /// Which partition to open + pub partition: usize, + /// The task context for execution + pub context: Arc, + /// Optional sibling-shared execution state, see + /// [`DataSource::create_sibling_state`] for details. + pub sibling_state: Option>, +} + +impl OpenArgs { + /// Create a new OpenArgs with required arguments + pub fn new(partition: usize, context: Arc) -> Self { + Self { + partition, + context, + sibling_state: None, + } + } + + /// Set sibling shared state + pub fn with_shared_state( + mut self, + sibling_state: Option>, + ) -> Self { + self.sibling_state = sibling_state; + self + } } /// [`ExecutionPlan`] that reads one or more files @@ -266,6 +326,12 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, + /// Per executon state shared across partitions of this plan. + /// + /// Created by [`DataSource::create_sibling_state`] + /// and then passed to + /// [`DataSource::open_with_args`]. + execution_state: Arc>>>, } impl DisplayAs for DataSourceExec { @@ -339,8 +405,15 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - let stream = self.data_source.open(partition, Arc::clone(&context))?; + let shared_state = self + .execution_state + .get_or_init(|| self.data_source.create_sibling_state()) + .clone(); + let args = OpenArgs::new(partition, Arc::clone(&context)) + .with_shared_state(shared_state); + let stream = self.data_source.open_with_args(args)?; let batch_size = context.session_config().batch_size(); + log::debug!( "Batch splitting enabled for partition {partition}: batch_size={batch_size}" ); @@ -377,8 +450,13 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = Arc::clone(&self.cache); + let execution_state = Arc::new(OnceLock::new()); - Some(Arc::new(Self { data_source, cache })) + Some(Arc::new(Self { + data_source, + cache, + execution_state, + })) } fn fetch(&self) -> Option { @@ -471,6 +549,12 @@ impl ExecutionPlan for DataSourceExec { as Arc }) } + + fn reset_state(self: Arc) -> Result> { + let mut new_exec = Arc::unwrap_or_clone(self); + new_exec.execution_state = Arc::new(OnceLock::new()); + Ok(Arc::new(new_exec)) + } } impl DataSourceExec { @@ -484,6 +568,7 @@ impl DataSourceExec { Self { data_source, cache: Arc::new(cache), + execution_state: Arc::new(OnceLock::new()), } } @@ -495,6 +580,7 @@ impl DataSourceExec { pub fn with_data_source(mut self, data_source: Arc) -> Self { self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; + self.execution_state = Arc::new(OnceLock::new()); self } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index 3a9e78943b07b..5ce0f1419d11d 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -34,6 +34,7 @@ pub(crate) struct MockSource { filter: Option>, table_schema: crate::table_schema::TableSchema, projection: crate::projection::SplitProjection, + file_opener: Option>, } impl Default for MockSource { @@ -45,6 +46,7 @@ impl Default for MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } } @@ -57,6 +59,7 @@ impl MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } @@ -64,6 +67,11 @@ impl MockSource { self.filter = Some(filter); self } + + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } } impl FileSource for MockSource { @@ -73,7 +81,9 @@ impl FileSource for MockSource { _base_config: &FileScanConfig, _partition: usize, ) -> Result> { - unimplemented!() + self.file_opener.clone().ok_or_else(|| { + datafusion_common::internal_datafusion_err!("MockSource missing FileOpener") + }) } fn as_any(&self) -> &dyn std::any::Any { From 5287210ff44c2a1d1372c6badc641659511fe8c2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 14 Apr 2026 12:56:48 -0400 Subject: [PATCH 2/5] fix: typos --- datafusion/datasource/src/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index afed439788463..4bdf7bf795f41 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -326,7 +326,7 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, - /// Per executon state shared across partitions of this plan. + /// Per execution state shared across partitions of this plan. /// /// Created by [`DataSource::create_sibling_state`] /// and then passed to From 4cefb3e7d90c6324b32f7ad73e56d063017bfc01 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Apr 2026 15:14:57 -0400 Subject: [PATCH 3/5] Update datafusion/datasource/src/file_stream/mod.rs Co-authored-by: Oleks V --- datafusion/datasource/src/file_stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index b9aabacf64c11..34b92ee5338b9 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -1059,7 +1059,7 @@ mod tests { let test = two_partition_morsel_test().with_file_stream_events(false); // Partition 0 starts with 3 files, but Partition 1 is polled first. - // Since Partition is polled first, it will run all the files even those + // Since Partition 1 is polled first, it will run all the files even those // that were assigned to Partition 0. insta::assert_snapshot!(test.run().await.unwrap(), @r" ----- Partition 0 ----- From cd793122b0dfb4b6f376c9fbd074f91f4c0bedab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Apr 2026 14:05:45 -0400 Subject: [PATCH 4/5] Add test for file opened/closed metrics --- datafusion/datasource/src/file_stream/mod.rs | 84 ++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 34b92ee5338b9..e277690cff810 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -1208,6 +1208,72 @@ mod tests { Ok(()) } + /// Verifies that a sibling hitting its limit does not count shared files + /// left in the queue as already processed by that stream. + #[tokio::test] + async fn morsel_shared_limit_does_not_double_count_files_processed() -> Result<()> { + let test = two_partition_morsel_test(); + let unlimited_config = test.test_config(); + let limited_config = test.clone().with_limit(1).test_config(); + let shared_work_source = limited_config + .create_sibling_state() + .and_then(|state| state.as_ref().downcast_ref::().cloned()) + .expect("shared work source"); + let limited_metrics = ExecutionPlanMetricsSet::new(); + let unlimited_metrics = ExecutionPlanMetricsSet::new(); + + let limited_stream = FileStreamBuilder::new(&limited_config) + .with_partition(1) + .with_shared_work_source(Some(shared_work_source.clone())) + .with_morselizer(Box::new(test.morselizer.clone())) + .with_metrics(&limited_metrics) + .build()?; + + let unlimited_stream = FileStreamBuilder::new(&unlimited_config) + .with_partition(0) + .with_shared_work_source(Some(shared_work_source)) + .with_morselizer(Box::new(test.morselizer)) + .with_metrics(&unlimited_metrics) + .build()?; + + let limited_output = drain_stream_output(limited_stream).await?; + let unlimited_output = drain_stream_output(unlimited_stream).await?; + + insta::assert_snapshot!(format!( + "----- Limited Stream -----\n{limited_output}\n----- Unlimited Stream -----\n{unlimited_output}" + ), @r" + ----- Limited Stream ----- + Batch: 101 + ----- Unlimited Stream ----- + Batch: 102 + Batch: 103 + Batch: 201 + "); + + assert_eq!( + metric_count(&limited_metrics, "files_opened"), + 1, + "the limited stream should only open the file that produced its output" + ); + assert_eq!( + metric_count(&limited_metrics, "files_processed"), + 1, + "the limited stream should only mark its own file as processed" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_opened"), + 3, + "the draining stream should open the remaining shared files" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_processed"), + 3, + "the draining stream should process exactly the files it opened" + ); + + Ok(()) + } + /// Verifies that one fast sibling can drain shared files that originated /// in more than one other partition. #[tokio::test] @@ -1547,6 +1613,24 @@ mod tests { } } + async fn drain_stream_output(stream: FileStream) -> Result { + let output = stream + .collect::>() + .await + .into_iter() + .map(|result| result.map(|batch| format_result(Ok(batch)))) + .collect::>>()?; + Ok(output.join("\n")) + } + + fn metric_count(metrics: &ExecutionPlanMetricsSet, name: &str) -> usize { + metrics + .clone_inner() + .sum_by_name(name) + .unwrap_or_else(|| panic!("missing metric: {name}")) + .as_usize() + } + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. struct PartitionState { /// Whether the `FileStream` for this partition has been built yet. From c58a9a8746a7d78abd6c14562a847cc1ef2907f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Apr 2026 14:07:30 -0400 Subject: [PATCH 5/5] properly account for limit --- datafusion/datasource/src/file_stream/scan_state.rs | 2 +- datafusion/datasource/src/file_stream/work_source.rs | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index fdae1bcf7e074..21125cd08896c 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -172,7 +172,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.work_source.len(); + let done = 1 + self.work_source.skipped_on_limit(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 1dcb6082c47c7..7f31dacca9592 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -45,11 +45,12 @@ impl WorkSource { } } - /// Return the number of files that are still waiting to be planned. - pub(super) fn len(&self) -> usize { + /// Return how many queued files should be counted as already processed + /// when this stream stops early after hitting a global limit. + pub(super) fn skipped_on_limit(&self) -> usize { match self { Self::Local(files) => files.len(), - Self::Shared(shared) => shared.len(), + Self::Shared(_) => 0, } } } @@ -94,9 +95,4 @@ impl SharedWorkSource { fn pop_front(&self) -> Option { self.inner.files.lock().pop_front() } - - /// Return the number of files still waiting in the shared queue. - fn len(&self) -> usize { - self.inner.files.lock().len() - } }