diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..16d314a2065f3 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,7 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{Array, RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; @@ -45,6 +45,10 @@ use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, lit, +}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -67,6 +71,7 @@ use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; @@ -1075,6 +1080,23 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); + // Initialize TopK dynamic filter from row group statistics. + // This sets an initial threshold before any data is read, so that + // subsequent row filtering can benefit immediately. + // Sort direction is read from the DynamicFilterPhysicalExpr's + // sort_options (set by SortExec for TopK queries). + if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit) + && let Err(e) = try_init_topk_threshold( + predicate, + limit, + rg_metadata, + &prepared.physical_file_schema, + reader_metadata.parquet_schema(), + ) + { + debug!("Skipping TopK threshold initialization from statistics: {e}"); + } + // Filter pushdown: evaluate predicates during scan let row_filter = if let Some(predicate) = prepared .pushdown_filters @@ -1218,6 +1240,193 @@ impl RowGroupsPrunedParquetOpen { } } +/// Attempt to initialize a TopK dynamic filter threshold from row group statistics. +/// +/// Before any parquet data is read, this function scans the row group min/max +/// statistics to compute an initial threshold for the TopK dynamic filter. +/// By setting this threshold early, subsequent row group pruning and row-level +/// filtering can benefit immediately. +/// +/// **Algorithm (single-column sort only):** +/// +/// For `ORDER BY col DESC LIMIT K`: +/// - For each row group where `num_rows >= K`: the min value of that RG is a +/// lower bound on the K-th largest value. +/// - `threshold = max(min)` across all qualifying row groups. +/// - Filter: `col > threshold` +/// +/// For `ORDER BY col ASC LIMIT K`: +/// - For each row group where `num_rows >= K`: the max value is an upper bound +/// on the K-th smallest value. +/// - `threshold = min(max)` across qualifying row groups. +/// - Filter: `col < threshold` +fn try_init_topk_threshold( + predicate: &Arc, + limit: usize, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + arrow_schema: &Schema, + parquet_schema: &parquet::schema::types::SchemaDescriptor, +) -> Result<()> { + // Find the DynamicFilterPhysicalExpr in the predicate tree. + let dynamic_filter = match find_dynamic_filter(predicate) { + Some(df) => df, + None => return Ok(()), // No dynamic filter found, nothing to do + }; + + // Read sort options from the dynamic filter (set by SortExec for TopK). + let sort_options = match dynamic_filter.sort_options() { + Some(opts) => opts, + None => return Ok(()), // No sort options, cannot determine direction + }; + + // Only handle single-column sort for now. + if sort_options.len() != 1 { + debug!( + "Skipping TopK threshold initialization: expected 1 sort column, found {}", + sort_options.len() + ); + return Ok(()); + } + + let is_descending = sort_options[0].descending; + + // The child must be a Column expression so we can look up statistics. + let children = dynamic_filter.children(); + let col_expr: Arc = Arc::clone(children[0]); + let col_any: &dyn std::any::Any = col_expr.as_ref(); + let column = col_any.downcast_ref::().ok_or_else(|| { + DataFusionError::Internal( + "TopK threshold init: sort child is not a Column expression".to_string(), + ) + })?; + + let col_name = column.name(); + + // Build a statistics converter for the sort column. + let converter = StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Compute the threshold. + let threshold = if is_descending { + // DESC: threshold = max(min) across RGs with num_rows >= limit + let mins = converter + .row_group_mins(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold_from_stats(&mins, rg_metadata, limit, true)? + } else { + // ASC: threshold = min(max) across RGs with num_rows >= limit + let maxes = converter + .row_group_maxes(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold_from_stats(&maxes, rg_metadata, limit, false)? + }; + + let threshold = match threshold { + Some(t) => t, + None => { + debug!("No qualifying row groups for TopK threshold initialization"); + return Ok(()); + } + }; + + // Build the filter expression: col > threshold (DESC) or col < threshold (ASC) + let op = if is_descending { + Operator::Gt + } else { + Operator::Lt + }; + + let filter_expr: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_expr), + op, + lit(threshold.clone()), + )); + + debug!( + "Initializing TopK dynamic filter from statistics: {} {} {}", + col_name, + if is_descending { ">" } else { "<" }, + threshold + ); + + dynamic_filter.update(filter_expr)?; + + Ok(()) +} + +/// Find a [`DynamicFilterPhysicalExpr`] in the predicate tree. +/// +/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by +/// checking the predicate itself and recursively walking its children. +fn find_dynamic_filter( + expr: &Arc, +) -> Option> { + // Try to downcast this expression directly. + // PhysicalExpr: Any + Send + Sync, so trait upcasting allows the coercion. + let cloned = Arc::clone(expr); + let any_arc: Arc = cloned; + if let Ok(df) = Arc::downcast::(any_arc) { + return Some(df); + } + + // Recursively check children + for child in expr.children() { + if let Some(df) = find_dynamic_filter(child) { + return Some(df); + } + } + + None +} + +/// Compute the best threshold from row group statistics. +/// +/// For `want_max = true` (DESC): finds the maximum value from the stats array +/// across row groups with `num_rows >= limit`. +/// +/// For `want_max = false` (ASC): finds the minimum value from the stats array +/// across row groups with `num_rows >= limit`. +fn compute_best_threshold_from_stats( + stats: &arrow::array::ArrayRef, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + limit: usize, + want_max: bool, +) -> Result> { + let mut best: Option = None; + + for (i, rg) in rg_metadata.iter().enumerate() { + // Only consider row groups with enough rows + if (rg.num_rows() as usize) < limit { + continue; + } + + // Skip null statistics + if i >= stats.len() || stats.is_null(i) { + continue; + } + + let value = ScalarValue::try_from_array(stats.as_ref(), i)?; + if value.is_null() { + continue; + } + + best = Some(match best { + None => value, + Some(current) => { + if want_max { + // Keep the maximum + if value > current { value } else { current } + } else { + // Keep the minimum + if value < current { value } else { current } + } + } + }); + } + + Ok(best) +} + /// State for a stream that decodes a single Parquet file using a push-based decoder. /// /// The [`transition`](Self::transition) method drives the decoder in a loop: it requests @@ -2720,4 +2929,404 @@ mod test { "without page index all rows are returned" ); } + + // --------------------------------------------------------------- + // Helper: build RowGroupMetaData with a given num_rows + // --------------------------------------------------------------- + + /// Create a minimal `SchemaDescriptor` with a single Int64 column named `id`. + fn make_int64_schema_descr() -> parquet::schema::types::SchemaDescPtr { + use parquet::basic::Type as PhysicalType; + use parquet::schema::types::Type as SchemaType; + + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT64) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + Arc::new(parquet::schema::types::SchemaDescriptor::new(Arc::new( + schema, + ))) + } + + /// Build a vector of `RowGroupMetaData`, one per entry in `row_counts`. + fn make_rg_metadata( + row_counts: &[i64], + ) -> Vec { + let schema_descr = make_int64_schema_descr(); + row_counts + .iter() + .map(|&num_rows| { + let column = parquet::file::metadata::ColumnChunkMetaData::builder( + schema_descr.column(0), + ) + .set_num_values(num_rows) + .build() + .unwrap(); + parquet::file::metadata::RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect() + } + + // --------------------------------------------------------------- + // Tests for compute_best_threshold_from_stats + // --------------------------------------------------------------- + + #[test] + fn test_compute_threshold_desc_picks_max_of_mins() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(10), Some(50), Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, Some(ScalarValue::Int64(Some(50)))); + } + + #[test] + fn test_compute_threshold_asc_picks_min_of_maxes() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(100), Some(50), Some(80)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, false).unwrap(); + assert_eq!(result, Some(ScalarValue::Int64(Some(50)))); + } + + #[test] + fn test_compute_threshold_skips_small_rgs() { + use arrow::array::{ArrayRef, Int64Array}; + + // RG 0: 100K rows, value=10 + // RG 1: 50 rows (< limit=100), value=999 — should be skipped + // RG 2: 100K rows, value=30 + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(10), Some(999), Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 50, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + // want_max=true => max(10, 30) = 30 (skipping the 999 from the small RG) + assert_eq!(result, Some(ScalarValue::Int64(Some(30)))); + } + + #[test] + fn test_compute_threshold_skips_null_stats() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(vec![Some(10), None, Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + // Null entry is skipped, max(10, 30) = 30 + assert_eq!(result, Some(ScalarValue::Int64(Some(30)))); + } + + #[test] + fn test_compute_threshold_all_rgs_too_small() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(vec![Some(10), Some(50)])); + let rg_meta = make_rg_metadata(&[5, 10]); // all < limit=100 + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_compute_threshold_empty_metadata() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(Vec::>::new())); + let rg_meta: Vec = vec![]; + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, None); + } + + // --------------------------------------------------------------- + // Tests for find_dynamic_filter + // --------------------------------------------------------------- + + #[test] + fn test_find_dynamic_filter_direct_match() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], super::lit(true)), + ); + + let result = find_dynamic_filter(&dynamic_filter); + assert!( + result.is_some(), + "should find direct DynamicFilterPhysicalExpr" + ); + } + + #[test] + fn test_find_dynamic_filter_nested_in_conjunction() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], super::lit(true)), + ); + // Wrap it: (id > 0) AND dynamic_filter + let left: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Gt, + super::lit(0i64), + )); + let conjunction: Arc = + Arc::new(BinaryExpr::new(left, Operator::And, dynamic_filter)); + + let result = find_dynamic_filter(&conjunction); + assert!( + result.is_some(), + "should find DynamicFilterPhysicalExpr nested in AND" + ); + } + + #[test] + fn test_find_dynamic_filter_none_when_absent() { + let col_expr: Arc = Arc::new(Column::new("id", 0)); + let result = find_dynamic_filter(&col_expr); + assert!( + result.is_none(), + "plain Column has no DynamicFilterPhysicalExpr" + ); + } + + // --------------------------------------------------------------- + // Tests for try_init_topk_threshold + // --------------------------------------------------------------- + + /// Write a small parquet file and return its metadata so we can build + /// realistic `RowGroupMetaData` with actual statistics. + fn make_parquet_metadata_with_stats( + row_counts: &[Vec], + ) -> ( + Vec, + Schema, + parquet::schema::types::SchemaDescPtr, + ) { + use parquet::arrow::ArrowWriter; + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + + let schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + // Write each row_counts entry as a separate row group + let mut buf = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, Arc::clone(&schema), None).unwrap(); + for values in row_counts { + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(arrow::array::Int64Array::from(values.clone()))], + ) + .unwrap(); + writer.write(&batch).unwrap(); + writer.flush().unwrap(); // flush after each batch -> separate RG + } + writer.close().unwrap(); + + // Read back metadata + let reader = SerializedFileReader::new(bytes::Bytes::from(buf)).unwrap(); + let file_metadata = reader.metadata(); + let rg_metadata: Vec<_> = file_metadata.row_groups().to_vec(); + let parquet_schema = file_metadata.file_metadata().schema_descr_ptr(); + (rg_metadata, schema.as_ref().clone(), parquet_schema) + } + + #[test] + fn test_try_init_topk_threshold_desc() { + // Two RGs: + // RG0: values [1..=200] => min=1, max=200 + // RG1: values [100..=300] => min=100, max=300 + // DESC sort, limit=10: threshold = max(min) = max(1, 100) = 100 + // Filter should be: id > 100 + let rg0_values: Vec = (1..=200).collect(); + let rg1_values: Vec = (100..=300).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); + + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let desc_opts = arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( + vec![Arc::clone(&col_expr)], + super::lit(true), + vec![desc_opts], + )); + let predicate: Arc = dynamic_filter.clone(); + + try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ) + .unwrap(); + + // Verify the dynamic filter was updated + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert!( + display.contains(">"), + "DESC should produce Gt operator, got: {display}" + ); + assert!( + display.contains("100"), + "threshold should be 100 (max of mins), got: {display}" + ); + } + + #[test] + fn test_try_init_topk_threshold_asc() { + // Two RGs: + // RG0: values [1..=200] => min=1, max=200 + // RG1: values [100..=300] => min=100, max=300 + // ASC sort, limit=10: threshold = min(max) = min(200, 300) = 200 + // Filter should be: id < 200 + let rg0_values: Vec = (1..=200).collect(); + let rg1_values: Vec = (100..=300).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); + + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let asc_opts = arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( + vec![Arc::clone(&col_expr)], + super::lit(true), + vec![asc_opts], + )); + let predicate: Arc = dynamic_filter.clone(); + + try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ) + .unwrap(); + + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert!( + display.contains("<"), + "ASC should produce Lt operator, got: {display}" + ); + assert!( + display.contains("200"), + "threshold should be 200 (min of maxes), got: {display}" + ); + } + + #[test] + fn test_try_init_topk_threshold_no_dynamic_filter() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // Plain predicate with no DynamicFilterPhysicalExpr + let predicate: Arc = Arc::new(Column::new("id", 0)); + + let result = try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!(result.is_ok(), "should be a no-op when no dynamic filter"); + } + + #[test] + fn test_try_init_topk_threshold_no_sort_options() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // DynamicFilterPhysicalExpr WITHOUT sort_options => should be skipped + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![col_expr], + super::lit(true), + )); + let predicate: Arc = dynamic_filter.clone(); + + let result = try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!( + result.is_ok(), + "no sort_options should be skipped gracefully" + ); + + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert_eq!(display, "true", "filter should remain unchanged: {display}"); + } + + #[test] + fn test_try_init_topk_threshold_multi_column_skipped() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // DynamicFilterPhysicalExpr with 2 sort columns => should be skipped + let col1 = Arc::new(Column::new("id", 0)) as Arc; + let col2 = Arc::new(Column::new("id", 0)) as Arc; + let opts = arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( + vec![col1, col2], + super::lit(true), + vec![opts, opts], + )); + let predicate: Arc = dynamic_filter.clone(); + + let result = try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!( + result.is_ok(), + "multi-column sort should be skipped gracefully" + ); + + // Filter should still be the original `super::lit(true)` + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert_eq!(display, "true", "filter should remain unchanged: {display}"); + } } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..5671426f50a15 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::SortOptions; use parking_lot::RwLock; use std::{fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; @@ -74,6 +75,11 @@ pub struct DynamicFilterPhysicalExpr { /// But this can have overhead in production, so it's only included in our tests. data_type: Arc>>, nullable: Arc>>, + /// Optional sort options for each child expression. + /// When set (e.g., by SortExec for TopK), downstream consumers like the + /// parquet reader can use these to initialize the filter threshold from + /// column statistics before reading any data. + sort_options: Option>, } #[derive(Debug)] @@ -177,9 +183,30 @@ impl DynamicFilterPhysicalExpr { state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), + sort_options: None, } } + /// Create a new [`DynamicFilterPhysicalExpr`] with sort options. + /// + /// Sort options indicate the sort direction for each child expression, + /// enabling downstream consumers (e.g., parquet readers) to initialize + /// the filter threshold from column statistics before reading data. + pub fn new_with_sort_options( + children: Vec>, + inner: Arc, + sort_options: Vec, + ) -> Self { + let mut this = Self::new(children, inner); + this.sort_options = Some(sort_options); + this + } + + /// Returns the sort options for each child expression, if available. + pub fn sort_options(&self) -> Option<&[SortOptions]> { + self.sort_options.as_deref() + } + fn remap_children( children: &[Arc], remapped_children: Option<&Vec>>, @@ -368,6 +395,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), + sort_options: self.sort_options.clone(), })) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..89fb558789d36 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -915,8 +915,17 @@ impl SortExec { .iter() .map(|sort_expr| Arc::clone(&sort_expr.expr)) .collect::>(); + let sort_options = self + .expr + .iter() + .map(|sort_expr| sort_expr.options) + .collect::>(); Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(children, lit(true)), + DynamicFilterPhysicalExpr::new_with_sort_options( + children, + lit(true), + sort_options, + ), )))) } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..78a69481a6363 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,145 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# ============================================================================ +# Test H: TopK threshold initialization from row group statistics +# ============================================================================ +# When TopK's dynamic filter is pushed to parquet, we can initialize +# its threshold from row group statistics before reading any data. +# This test verifies correctness with scrambled multi-file data. + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 5; + +# Create 3 files with non-overlapping but scrambled id ranges: +# file_a.parquet: ids 7-10 (highest range, alphabetically first) +# file_b.parquet: ids 1-3 (lowest range) +# file_c.parquet: ids 4-6 (middle range) +# Alphabetical order: a(7-10), b(1-3), c(4-6) — NOT sorted by id. +# TopK DESC should read file_a first (highest min) after reorder. + +statement ok +CREATE TABLE th_high(id INT, value INT) AS VALUES +(7, 700), (8, 800), (9, 900), (10, 1000); + +statement ok +CREATE TABLE th_low(id INT, value INT) AS VALUES +(1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE th_mid(id INT, value INT) AS VALUES +(4, 400), (5, 500), (6, 600); + +query I +COPY (SELECT * FROM th_high ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_a.parquet'; +---- +4 + +query I +COPY (SELECT * FROM th_low ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_b.parquet'; +---- +3 + +query I +COPY (SELECT * FROM th_mid ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_c.parquet'; +---- +3 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE th_scrambled(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/topk_stats/' +WITH ORDER (id ASC); + +# Test H.1: DESC LIMIT should return correct results despite scrambled file order +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 3; +---- +10 1000 +9 900 +8 800 + +# Test H.2: ASC LIMIT should also work correctly +query II +SELECT * FROM th_scrambled ORDER BY id ASC LIMIT 3; +---- +1 100 +2 200 +3 300 + +# Test H.3: Larger LIMIT spanning multiple files +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 7; +---- +10 1000 +9 900 +8 800 +7 700 +6 600 +5 500 +4 400 + +# Test H.4: DESC with EXPLAIN to verify dynamic filter is pushed down +query TT +EXPLAIN SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_scrambled.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_scrambled projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_a.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_c.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_b.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Test H.5: With multiple partitions — TopK filter is shared across partitions +statement ok +SET datafusion.execution.target_partitions = 4; + +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 5; +---- +10 1000 +9 900 +8 800 +7 700 +6 600 + +query II +SELECT * FROM th_scrambled ORDER BY id ASC LIMIT 5; +---- +1 100 +2 200 +3 300 +4 400 +5 500 + +# Cleanup Test H +statement ok +DROP TABLE th_high; + +statement ok +DROP TABLE th_low; + +statement ok +DROP TABLE th_mid; + +statement ok +DROP TABLE th_scrambled; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; @@ -2280,3 +2419,6 @@ SET datafusion.execution.collect_statistics = true; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false;