diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 6c5e0e483e15c..aa1ec477345c6 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files -sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder -sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder -sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario) +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -1154,23 +1154,10 @@ run_sort_pushdown_sorted() { # Generates data for sort pushdown Inexact benchmark. # -# Produces multiple parquet files where each file has MULTIPLE row groups -# with scrambled RG order. This tests both: -# - Row-group-level reorder within each file (reorder_by_statistics) -# - TopK threshold initialization from RG statistics -# -# Strategy: -# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total). -# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the -# RG order within each file using a deterministic permutation. -# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges -# but in scrambled order. -# -# Writing a single file with ORDER BY scramble does NOT work: the parquet -# writer merges rows from adjacent chunks at RG boundaries, widening -# ranges and defeating reorder_by_statistics. -# -# Requires pyarrow (pip install pyarrow). +# Produces a single large lineitem parquet file where row groups have +# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally +# sorted, RGs shuffled). This simulates append-heavy workloads where data +# is written in batches at different times. data_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then @@ -1178,14 +1165,7 @@ data_sort_pushdown_inexact() { return fi - # Check pyarrow dependency (needed to split/scramble RGs) - if ! python3 -c "import pyarrow" 2>/dev/null; then - echo "Error: pyarrow is required for sort pushdown Inexact data generation." - echo "Install with: pip install pyarrow" - return 1 - fi - - echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..." + echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." # Re-use the sort_pushdown data as the source (generate if missing) data_sort_pushdown @@ -1193,111 +1173,65 @@ data_sort_pushdown_inexact() { mkdir -p "${INEXACT_DIR}" SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" - # Step 1: Write a single sorted file with small (100K-row) RGs - TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet" + # Use datafusion-cli to bucket rows into 64 groups by a deterministic + # scrambler, then sort within each bucket by orderkey. This produces + # ~64 RG-sized segments where each has a tight orderkey range but the + # segments appear in scrambled (non-sorted) order in the file. (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - COPY (SELECT * FROM src ORDER BY l_orderkey) - TO '${TMPFILE}' + COPY ( + SELECT * FROM src + ORDER BY + (l_orderkey * 1664525 + 1013904223) % 64, + l_orderkey + ) + TO '${INEXACT_DIR}/shuffled.parquet' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - # Step 2: Redistribute RGs into 3 files with scrambled RG order. - # Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file, - # permutation (rg_idx * 41 + 7) % n scrambles the order within file. - python3 -c " -import pyarrow.parquet as pq - -pf = pq.ParquetFile('${TMPFILE}') -n = pf.metadata.num_row_groups -n_files = 3 - -# Assign each RG to a file, scramble order within each file -file_rgs = [[] for _ in range(n_files)] -for rg_idx in range(n): - slot = (rg_idx * 41 + 7) % n # scrambled index - file_id = slot % n_files - file_rgs[file_id].append(rg_idx) - -# Write each file with its assigned RGs (in scrambled order) -for file_id in range(n_files): - rgs = file_rgs[file_id] - if not rgs: - continue - tables = [pf.read_row_group(rg) for rg in rgs] - writer = pq.ParquetWriter( - '${INEXACT_DIR}/part_%03d.parquet' % file_id, - pf.schema_arrow) - for t in tables: - writer.write_table(t) - writer.close() - print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') -" - - rm -f "${TMPFILE}" - echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}" + echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" ls -la "${INEXACT_DIR}" - # Also generate overlap data: same strategy but with different file count - # and permutation. Simulates streaming data with network delays where - # chunks arrive out of sequence. - # - # Requires pyarrow (pip install pyarrow). + # Also generate a file with partially overlapping row groups. + # Simulates streaming data with network delays: each chunk is mostly + # in order but has a small overlap with the next chunk (±5% of the + # chunk range). This is the pattern described by @adriangb — data + # arriving with timestamps that are generally increasing but with + # network-induced jitter causing small overlaps between row groups. OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" return fi - echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..." + echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." mkdir -p "${OVERLAP_DIR}" - # Step 1: Write a single sorted file with small (100K-row) RGs - TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet" (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - COPY (SELECT * FROM src ORDER BY l_orderkey) - TO '${TMPFILE}' + -- Add jitter to l_orderkey: shift each row by a random-ish offset + -- proportional to its position. This creates overlap between adjacent + -- row groups while preserving the general ascending trend. + -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 + -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. + COPY ( + SELECT * FROM src + ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 + ) + TO '${OVERLAP_DIR}/overlapping.parquet' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - # Step 2: Redistribute into 5 files with scrambled RG order. - python3 -c " -import pyarrow.parquet as pq - -pf = pq.ParquetFile('${TMPFILE}') -n = pf.metadata.num_row_groups -n_files = 5 - -file_rgs = [[] for _ in range(n_files)] -for rg_idx in range(n): - slot = (rg_idx * 37 + 13) % n - file_id = slot % n_files - file_rgs[file_id].append(rg_idx) - -for file_id in range(n_files): - rgs = file_rgs[file_id] - if not rgs: - continue - tables = [pf.read_row_group(rg) for rg in rgs] - writer = pq.ParquetWriter( - '${OVERLAP_DIR}/part_%03d.parquet' % file_id, - pf.schema_arrow) - for t in tables: - writer.write_table(t) - writer.close() - print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') -" - - rm -f "${TMPFILE}" + echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" + ls -la "${OVERLAP_DIR}" } # Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). @@ -1306,7 +1240,7 @@ for file_id in range(n_files): run_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" - echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..." + echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -1322,13 +1256,13 @@ run_sort_pushdown_inexact_unsorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } -# Runs the sort pushdown benchmark with multi-file scrambled RG order. -# Simulates streaming data with network delays — multiple files, each with -# scrambled RGs. Tests both RG-level reorder and TopK stats initialization. +# Runs the sort pushdown benchmark with partially overlapping RGs. +# Simulates streaming data with network jitter — RGs are mostly in order +# but have small overlaps (±2500 orderkey jitter between adjacent RGs). run_sort_pushdown_inexact_overlap() { OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" - echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..." + echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index d14afaf1b3267..9104e2900db45 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -305,21 +305,36 @@ async fn test_fuzz_topk_filter_pushdown() { } let mut queries = vec![]; + let all_columns = ["id", "name", "department"]; for limit in [1, 10] { for num_order_by_columns in [1, 2, 3] { - for order_columns in ["id", "name", "department"] - .iter() - .combinations(num_order_by_columns) - { + for order_columns in all_columns.iter().combinations(num_order_by_columns) { for orderings in order_columns .iter() .map(|col| orders.get(**col).unwrap()) .multi_cartesian_product() { + // Add remaining columns as ASC tiebreakers to make + // the ordering fully deterministic. Without this, + // optimizations that change RG read order (e.g. + // statistics-based pruning) may produce different + // but equally valid tie-breaking results. + let used: Vec<&str> = order_columns.iter().map(|c| **c).collect(); + let tiebreakers: Vec = all_columns + .iter() + .filter(|c| !used.contains(*c)) + .map(|c| format!("{c} ASC NULLS LAST")) + .collect(); + let mut all_orderings: Vec<&str> = + orderings.iter().map(|s| s.as_str()).collect(); + let tiebreaker_refs: Vec<&str> = + tiebreakers.iter().map(|s| s.as_str()).collect(); + all_orderings.extend(tiebreaker_refs); + let query = format!( "SELECT * FROM test_table ORDER BY {} LIMIT {}", - orderings.into_iter().join(", "), + all_orderings.join(", "), limit ); queries.push(query); diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..7b4484ae08e39 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -16,7 +16,12 @@ // under the License. use crate::sort::reverse_row_selection; +use arrow::datatypes::Schema; use datafusion_common::{Result, assert_eq_or_internal_err}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use log::debug; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; @@ -346,7 +351,6 @@ impl ParquetAccessPlan { ) -> Result { let row_group_indexes = self.row_group_indexes(); let row_selection = self.into_overall_row_selection(row_group_meta_data)?; - PreparedAccessPlan::new(row_group_indexes, row_selection) } } @@ -377,6 +381,126 @@ impl PreparedAccessPlan { }) } + /// Return a reference to the row group indexes. + pub(crate) fn row_group_indexes(&self) -> &[usize] { + &self.row_group_indexes + } + + /// Keep only the first `count` row groups, dropping the rest. + /// Used for TopK cumulative pruning after reorder + reverse. + pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self { + // Skip truncation if row_selection exists — it contains page-level + // pruning state that would be lost. Cumulative prune is a best-effort + // optimization; page pruning is already reducing I/O within those RGs. + if self.row_selection.is_some() { + return self; + } + self.row_group_indexes.truncate(count); + self + } + + /// Reorder row groups by their min statistics for the given sort order. + /// + /// This helps TopK queries find optimal values first. For ASC sort, + /// row groups with the smallest min values come first. For DESC sort, + /// row groups with the largest min values come first. + /// + /// Gracefully skips reordering when: + /// - There is a row_selection (too complex to remap) + /// - 0 or 1 row groups (nothing to reorder) + /// - Sort expression is not a simple column reference + /// - Statistics are unavailable + pub(crate) fn reorder_by_statistics( + mut self, + sort_order: &LexOrdering, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + // Skip if row_selection present (too complex to remap) + if self.row_selection.is_some() { + debug!("Skipping RG reorder: row_selection present"); + return Ok(self); + } + + // Nothing to reorder + if self.row_group_indexes.len() <= 1 { + return Ok(self); + } + + // Get the first sort expression + // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr + let first_sort_expr = sort_order.first(); + + // Extract column name from sort expression + let column: &Column = match first_sort_expr.expr.downcast_ref::() { + Some(col) => col, + None => { + debug!("Skipping RG reorder: sort expr is not a simple column"); + return Ok(self); + } + }; + + // Build statistics converter for this column + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + file_metadata.file_metadata().schema_descr(), + ) { + Ok(c) => c, + Err(e) => { + debug!("Skipping RG reorder: cannot create stats converter: {e}"); + return Ok(self); + } + }; + + // Always sort by min values in ASC order to align row groups with + // the file's declared output ordering. Direction (DESC) is handled + // separately by ReverseRowGroups which is applied AFTER reorder. + // + // This composable design avoids the problem where reorder(DESC) + // followed by reverse would double-flip the order, and ensures + // that for already-sorted data, reorder is a no-op and reverse + // gives the correct DESC order (including placing small tail RGs first). + let rg_metadata: Vec<&RowGroupMetaData> = self + .row_group_indexes + .iter() + .map(|&idx| file_metadata.row_group(idx)) + .collect(); + + let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } + }; + + // Always sort ASC by min values — direction is handled by reverse + let sort_options = arrow::compute::SortOptions { + descending: false, + nulls_first: first_sort_expr.options.nulls_first, + }; + let sorted_indices = + match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None) + { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; + + // Apply the reordering + let original_indexes = self.row_group_indexes.clone(); + self.row_group_indexes = sorted_indices + .values() + .iter() + .map(|&i| original_indexes[i as usize]) + .collect(); + + Ok(self) + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -614,4 +738,290 @@ mod test { .unwrap(); Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + // ---- reorder_by_statistics tests ---- + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::FileMetaData; + use parquet::file::statistics::Statistics; + use parquet::schema::types::Type as SchemaType; + + /// Create ParquetMetaData with row groups that have Int32 min/max stats + fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData { + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema))); + + let row_groups: Vec = min_max_pairs + .iter() + .map(|(min, max)| { + let stats = + Statistics::int32(Some(*min), Some(*max), None, Some(100), false); + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(100) + .set_statistics(stats) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(100) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect(); + + let file_meta = FileMetaData::new( + 1, + min_max_pairs.len() as i64 * 100, + None, + None, + schema_descr, + None, + ); + ParquetMetaData::new(file_meta, row_groups) + } + + fn make_sort_order_asc() -> LexOrdering { + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "id", 0, + )))]) + .unwrap() + } + + fn make_sort_order_desc() -> LexOrdering { + use arrow::compute::SortOptions; + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("id", 0)), + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap() + } + + fn make_arrow_schema() -> Schema { + Schema::new(vec![Field::new("id", DataType::Int32, false)]) + } + + #[test] + fn test_reorder_by_statistics_asc() { + // RGs in wrong order: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299) + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); + } + + #[test] + fn test_reorder_by_statistics_desc_sorts_asc() { + // reorder_by_statistics always sorts by min ASC regardless of sort + // direction. DESC is handled separately by ReverseRowGroups which + // is applied after reorder in the optimizer pipeline. + // + // RGs: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Always ASC by min: RG2(min=1), RG0(min=50), RG1(min=200) + // Reverse is applied separately for DESC queries. + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); + } + + #[test] + fn test_reorder_by_statistics_single_rg() { + let metadata = make_metadata_with_stats(&[(1, 100)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Single RG, no reorder + assert_eq!(plan.row_group_indexes, vec![0]); + } + + #[test] + fn test_reorder_by_statistics_with_skipped_rgs() { + // 4 RGs but only 0, 2, 3 are selected (RG1 was pruned) + let metadata = + make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400) + assert_eq!(plan.row_group_indexes, vec![2, 3, 0]); + } + + #[test] + fn test_reorder_by_statistics_skips_with_row_selection() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let selection = RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ]); + + let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because row_selection is present + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_already_sorted() { + // Already in correct ASC order + let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Already sorted, order preserved + assert_eq!(plan.row_group_indexes, vec![0, 1, 2]); + } + + #[test] + fn test_reorder_by_statistics_skips_non_column_expr() { + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::BinaryExpr; + + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + + // Sort expression is a binary expression (id + 1), not a simple column + let expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Plus, + Arc::new(datafusion_physical_expr::expressions::Literal::new( + datafusion_common::ScalarValue::Int32(Some(1)), + )), + )); + let sort_order = + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because sort expr is not a simple column + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_overlapping_rgs_sorts_asc() { + // Overlapping ranges — reorder always uses min ASC: + // RG0: 50-60 + // RG1: 40-100 (lower min, wider range) + // RG2: 20-30 (lowest min) + // + // Sorted by min ASC: [RG2(20), RG1(40), RG0(50)] + // For DESC queries, ReverseRowGroups is applied after to flip order. + let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Always ASC by min: RG2(min=20), RG1(min=40), RG0(min=50) + assert_eq!(plan.row_group_indexes, vec![2, 1, 0]); + } + + #[test] + fn test_reorder_by_statistics_skips_missing_column() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + // Schema has "id" but sort order references "nonexistent" + let schema = make_arrow_schema(); + let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("nonexistent", 99), + ))]) + .unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because column not found in schema + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } +} + +#[test] +fn test_truncate_row_groups_basic() { + let plan = PreparedAccessPlan { + row_group_indexes: vec![5, 3, 1, 4, 2], + row_selection: None, + }; + let plan = plan.truncate_row_groups(3); + assert_eq!(plan.row_group_indexes, vec![5, 3, 1]); + assert!(plan.row_selection.is_none()); +} + +#[test] +fn test_truncate_row_groups_skips_when_row_selection_present() { + let selection = RowSelection::from(vec![ + RowSelector::select(100), + RowSelector::skip(50), + RowSelector::select(100), + ]); + let plan = PreparedAccessPlan { + row_group_indexes: vec![5, 3, 1, 4, 2], + row_selection: Some(selection), + }; + // Should NOT truncate because row_selection is present + let plan = plan.truncate_row_groups(2); + assert_eq!(plan.row_group_indexes, vec![5, 3, 1, 4, 2]); + assert!(plan.row_selection.is_some()); +} + +#[test] +fn test_truncate_row_groups_no_op_when_count_exceeds_len() { + let plan = PreparedAccessPlan { + row_group_indexes: vec![1, 2], + row_selection: None, + }; + let plan = plan.truncate_row_groups(10); + assert_eq!(plan.row_group_indexes, vec![1, 2]); } diff --git a/datafusion/datasource-parquet/src/access_plan_optimizer.rs b/datafusion/datasource-parquet/src/access_plan_optimizer.rs new file mode 100644 index 0000000000000..885dc0b5656ee --- /dev/null +++ b/datafusion/datasource-parquet/src/access_plan_optimizer.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`AccessPlanOptimizer`] trait and implementations for optimizing +//! row group access order during parquet scans. +//! +//! Applied after row group pruning but before building the decoder, +//! these optimizers reorder (or reverse) the row groups to improve +//! query performance — e.g., placing the "best" row groups first +//! so TopK's dynamic filter threshold tightens quickly. + +use crate::access_plan::PreparedAccessPlan; +use arrow::datatypes::Schema; +use datafusion_common::Result; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use parquet::file::metadata::ParquetMetaData; +use std::fmt::Debug; + +/// Optimizes the row group access order for a prepared access plan. +/// +/// Implementations can reorder, reverse, or otherwise transform the +/// row group read order to improve scan performance. The optimizer +/// is applied once per file, after all pruning passes are complete. +/// +/// # Examples +/// +/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data +/// - [`ReorderByStatistics`]: sort row groups by min/max statistics +/// so TopK queries find optimal values first +pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug { + /// Transform the prepared access plan. + /// + /// Implementations should return the plan unchanged if they cannot + /// apply their optimization (e.g., missing statistics). + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result; +} + +/// Reverse the row group order — simple O(n) reversal. +/// +/// Used as a fallback when the sort column has no statistics available. +/// For ASC-sorted files with a DESC query, reversing row groups places +/// the highest-value row groups first. +#[derive(Debug)] +pub(crate) struct ReverseRowGroups; + +impl AccessPlanOptimizer for ReverseRowGroups { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + _arrow_schema: &Schema, + ) -> Result { + plan.reverse(file_metadata) + } +} + +/// Reorder row groups by min/max statistics of the sort column. +/// +/// For ASC sort: row groups with the smallest min come first. +/// For DESC sort: row groups with the largest max come first. +/// +/// This is more effective than [`ReverseRowGroups`] when row groups +/// are out of order (e.g., append-heavy workloads), because it uses +/// actual statistics rather than assuming the original order is sorted. +/// +/// Gracefully falls back to the original order when statistics are +/// unavailable, the sort expression is not a simple column, etc. +#[derive(Debug)] +pub(crate) struct ReorderByStatistics { + sort_order: LexOrdering, +} + +impl ReorderByStatistics { + pub(crate) fn new(sort_order: LexOrdering) -> Self { + Self { sort_order } + } +} + +impl AccessPlanOptimizer for ReorderByStatistics { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + plan.reorder_by_statistics(&self.sort_order, file_metadata, arrow_schema) + } +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..de42c527845fb 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -25,6 +25,7 @@ #![cfg_attr(test, allow(clippy::needless_pass_by_value))] pub mod access_plan; +pub(crate) mod access_plan_optimizer; pub mod file_format; pub mod metadata; mod metrics; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..2dbbc53021170 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,9 +24,13 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{Array, RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, lit, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -50,6 +54,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, PruningMetrics, @@ -67,6 +72,7 @@ use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; @@ -136,6 +142,10 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by their min/max statistics. + /// When set, row groups are reordered before reading so that row groups likely + /// to contain optimal values (for TopK queries) are read first. + pub sort_order_for_reorder: Option, } impl fmt::Debug for ParquetMorselizer { @@ -286,6 +296,7 @@ struct PreparedParquetOpen { predicate_creation_errors: Count, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -655,6 +666,7 @@ impl ParquetMorselizer { predicate_creation_errors, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -822,6 +834,37 @@ impl MetadataLoadedParquetOpen { } prepared.physical_file_schema = Arc::clone(&physical_file_schema); + // Initialize TopK threshold from RG statistics BEFORE building + // PruningPredicate. Uses GtEq/LtEq to include boundary values. + // Only for sort pushdown + no WHERE (pure DynamicFilter predicate). + // Uses df.fetch() as limit so stats init skips when K spans multiple + // RGs (no single RG has >= K rows), letting cumulative prune handle it. + if prepared.sort_order_for_reorder.is_some() + && let Some(pred) = &prepared.predicate + && let Some(df) = find_dynamic_filter(pred) + && df.sort_options().is_some_and(|opts| !opts.is_empty()) + && let Some(fetch) = df.fetch() + { + // Only when predicate is pure DynamicFilter (no WHERE) + let any_ref: &dyn std::any::Any = pred.as_ref(); + if any_ref + .downcast_ref::() + .is_some() + { + let file_metadata = reader_metadata.metadata(); + let rg_metadata = file_metadata.row_groups(); + if let Err(e) = try_init_topk_threshold( + pred, + fetch, + rg_metadata, + &physical_file_schema, + reader_metadata.parquet_schema(), + ) { + debug!("Skipping TopK threshold init: {e}"); + } + } + } + // Build predicates for this specific file let pruning_predicate = build_pruning_predicates( prepared.predicate.as_ref(), @@ -1123,13 +1166,157 @@ impl RowGroupsPrunedParquetOpen { ); } - // Prepare the access plan (extract row groups and row selection) + // Row group ordering optimization (two composable steps): + // + // 1. reorder_by_statistics: sort RGs by min values (ASC) to align + // with the file's declared output ordering. This fixes out-of-order + // RGs (e.g., from append-heavy workloads) without changing direction. + // Skipped gracefully when statistics are unavailable. + // + // 2. reverse: flip the order for DESC queries. Applied AFTER reorder + // so the reversed order is correct whether or not reorder changed + // anything. Also handles row_selection remapping. + // + // For sorted data: reorder is a no-op, reverse gives perfect DESC. + // For unsorted data: reorder fixes the order, reverse flips for DESC. + // Build reorder optimizer from sort_order_for_reorder (Inexact path) + // or from DynamicFilterPhysicalExpr sort_options (any TopK query). + // Fuzz test uses tiebreaker columns so reorder is safe for all TopK. + let reorder_optimizer: Option< + Box, + > = if let Some(sort_order) = &prepared.sort_order_for_reorder { + Some( + Box::new(crate::access_plan_optimizer::ReorderByStatistics::new( + sort_order.clone(), + )) + as Box, + ) + } else if let Some(predicate) = &prepared.predicate + && let Some(df) = find_dynamic_filter(predicate) + && let Some(sort_options) = df.sort_options() + && !sort_options.is_empty() + { + // Build a sort order from DynamicFilter for non-sort-pushdown TopK. + // Quick bail: check if the sort column exists in file schema. + // For GROUP BY + ORDER BY, the sort column is an aggregate output + // (not in parquet) — skip to avoid wasted StatisticsConverter work. + let children = df.children(); + if !children.is_empty() { + let col = find_column_in_expr(children[0]); + if let Some(ref c) = col + && prepared + .physical_file_schema + .field_with_name(c.name()) + .is_ok() + { + let sort_expr = + datafusion_physical_expr_common::sort_expr::PhysicalSortExpr { + expr: Arc::clone(children[0]), + options: arrow::compute::SortOptions { + descending: false, + nulls_first: sort_options[0].nulls_first, + }, + }; + LexOrdering::new(vec![sort_expr]).map(|order| { + Box::new(crate::access_plan_optimizer::ReorderByStatistics::new( + order, + )) + as Box + }) + } else { + None + } + } else { + None + } + } else { + None + }; + + // Reverse for DESC queries. Only when reorder is active (the sort + // column exists in parquet stats). Without reorder, reversing RGs + // randomly changes I/O patterns with no benefit. + let is_descending = prepared.reverse_row_groups + || (reorder_optimizer.is_some() + && prepared + .predicate + .as_ref() + .and_then(find_dynamic_filter) + .and_then(|df| df.sort_options().map(|opts| opts[0].descending)) + .unwrap_or(false)); + let reverse_optimizer: Option< + Box, + > = if is_descending { + Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups)) + } else { + None + }; + + // Prepare the access plan and apply optimizers in order: + // 1. reorder (fix out-of-order RGs to match declared ordering) + // 2. reverse (flip for DESC queries) let mut prepared_plan = access_plan.prepare(rg_metadata)?; + if let Some(opt) = &reorder_optimizer { + prepared_plan = opt.optimize( + prepared_plan, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } + if let Some(opt) = &reverse_optimizer { + prepared_plan = opt.optimize( + prepared_plan, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } - // Potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - if prepared.reverse_row_groups { - prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; + // TopK cumulative pruning: after reorder + reverse, the RGs are in + // optimal order. Accumulate rows from the front until >= K, prune rest. + // + // Only safe when predicate is DynamicFilter-only (no WHERE clause). + // With WHERE, raw num_rows overestimates qualifying rows — cumulative + // prune may keep too few RGs, returning fewer than K results. + // + // Additionally requires either sort pushdown (guaranteed non-overlapping) + // or verified non-overlap from statistics. + let is_pure_dynamic_filter = prepared.predicate.as_ref().is_some_and(|p| { + let any_ref: &dyn std::any::Any = p.as_ref(); + any_ref + .downcast_ref::() + .is_some() + }); + let has_sort_pushdown = prepared.sort_order_for_reorder.is_some(); + if is_pure_dynamic_filter + && let Some(predicate) = &prepared.predicate + && let Some(df) = find_dynamic_filter(predicate) + && let Some(fetch) = df.fetch() + && (has_sort_pushdown + || rgs_are_non_overlapping( + &prepared_plan, + file_metadata.as_ref(), + &prepared.physical_file_schema, + &df, + )) + { + let rg_indexes = prepared_plan.row_group_indexes(); + let mut cumulative = 0usize; + let mut keep_count = 0; + for &idx in rg_indexes { + cumulative += file_metadata.row_group(idx).num_rows() as usize; + keep_count += 1; + if cumulative >= fetch { + break; + } + } + if keep_count < rg_indexes.len() { + let pruned = rg_indexes.len() - keep_count; + debug!( + "TopK cumulative prune: keeping {keep_count} of {} RGs ({cumulative} rows >= fetch={fetch}), pruning {pruned}", + rg_indexes.len() + ); + prepared_plan = prepared_plan.truncate_row_groups(keep_count); + } } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); @@ -1218,6 +1405,260 @@ impl RowGroupsPrunedParquetOpen { } } +/// Find a [`DynamicFilterPhysicalExpr`] in the predicate tree. +/// +/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by +/// checking the predicate itself and recursively walking its children. +/// Check if row groups in the prepared plan are non-overlapping on the +/// sort column. Adjacent RGs must satisfy `max(i) <= min(i+1)`. +/// Initialize TopK dynamic filter threshold from row group statistics. +/// +/// For DESC: `threshold = max(min)` across RGs with `num_rows >= fetch`. +/// For ASC: `threshold = min(max)` across qualifying RGs. +/// Uses GtEq/LtEq to include boundary values. +fn try_init_topk_threshold( + predicate: &Arc, + fetch: usize, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + arrow_schema: &Schema, + parquet_schema: &parquet::schema::types::SchemaDescriptor, +) -> Result<()> { + let dynamic_filter = match find_dynamic_filter(predicate) { + Some(df) => df, + None => return Ok(()), + }; + + if dynamic_filter.snapshot_generation() > 1 { + return Ok(()); // Already initialized + } + + let sort_options = match dynamic_filter.sort_options() { + Some(opts) if !opts.is_empty() => opts, + _ => return Ok(()), + }; + + let is_descending = sort_options[0].descending; + let nulls_first = sort_options[0].nulls_first; + + let column = match find_column_in_expr(dynamic_filter.children()[0]) { + Some(col) => col, + None => return Ok(()), + }; + + let col_name = column.name(); + let converter = StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let threshold = if is_descending { + let mins = converter + .row_group_mins(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold(&mins, rg_metadata, fetch, true)? + } else { + let maxes = converter + .row_group_maxes(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold(&maxes, rg_metadata, fetch, false)? + }; + + let threshold = match threshold { + Some(t) => t, + None => return Ok(()), // No RG with >= fetch rows + }; + + // Cast threshold to column type + let col_expr: Arc = Arc::clone(dynamic_filter.children()[0]); + let col_data_type = col_expr.data_type(arrow_schema)?; + let threshold_casted = threshold.cast_to(&col_data_type)?; + + // GtEq/LtEq: boundary value IS a valid top-K value + let op = if is_descending { + Operator::GtEq + } else { + Operator::LtEq + }; + + let comparison: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_expr), + op, + lit(threshold_casted), + )); + + let filter_expr: Arc = if nulls_first { + use datafusion_physical_expr::expressions::is_null; + let null_check = is_null(Arc::clone(&col_expr))?; + Arc::new(BinaryExpr::new(null_check, Operator::Or, comparison)) + } else { + comparison + }; + + debug!( + "TopK stats init: {col_name} {op} (fetch={fetch})", + op = if is_descending { ">=" } else { "<=" } + ); + + dynamic_filter.update(filter_expr)?; + Ok(()) +} + +/// Find the best threshold from RG statistics. +/// `want_max=true`: max of values (for DESC min). `want_max=false`: min of values (for ASC max). +fn compute_best_threshold( + stats: &arrow::array::ArrayRef, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + fetch: usize, + want_max: bool, +) -> Result> { + let mut best: Option = None; + for (i, rg) in rg_metadata.iter().enumerate() { + if (rg.num_rows() as usize) < fetch { + continue; + } + if i >= stats.len() || stats.is_null(i) { + continue; + } + let value = ScalarValue::try_from_array(stats.as_ref(), i)?; + if value.is_null() { + continue; + } + best = Some(match best { + None => value, + Some(current) => { + if want_max { + if value > current { value } else { current } + } else { + if value < current { value } else { current } + } + } + }); + } + Ok(best) +} + +fn rgs_are_non_overlapping( + plan: &crate::access_plan::PreparedAccessPlan, + file_metadata: &parquet::file::metadata::ParquetMetaData, + arrow_schema: &Schema, + dynamic_filter: &DynamicFilterPhysicalExpr, +) -> bool { + let sort_options = match dynamic_filter.sort_options() { + Some(opts) if !opts.is_empty() => opts, + _ => return false, + }; + let children = dynamic_filter.children(); + if children.is_empty() { + return false; + } + let column = match find_column_in_expr(children[0]) { + Some(col) => col, + None => return false, + }; + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + file_metadata.file_metadata().schema_descr(), + ) { + Ok(c) => c, + Err(_) => return false, + }; + + let rg_indexes = plan.row_group_indexes(); + if rg_indexes.len() <= 1 { + return true; // 0 or 1 RG is trivially non-overlapping + } + + // Get min/max for the reordered RGs. + // After reorder (min ASC) + possible reverse (DESC), check adjacent pairs. + // For ASC order: max[i] <= min[i+1] + // For DESC order (reversed): min[i] >= max[i+1] (equivalently max[i+1] <= min[i]) + let is_descending = sort_options[0].descending; + let rg_metadata: Vec<_> = rg_indexes + .iter() + .map(|&idx| file_metadata.row_group(idx)) + .collect(); + let mins = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(m) => m, + Err(_) => return false, + }; + let maxes = match converter.row_group_maxes(rg_metadata.iter().copied()) { + Ok(m) => m, + Err(_) => return false, + }; + + for i in 0..rg_indexes.len() - 1 { + if i >= mins.len() || i + 1 >= mins.len() { + return false; + } + if mins.is_null(i) + || mins.is_null(i + 1) + || maxes.is_null(i) + || maxes.is_null(i + 1) + { + return false; + } + let (prev_max, next_min) = if is_descending { + // Reversed order: RG[i] has higher values than RG[i+1] + // Check: min[i] >= max[i+1] + match ( + ScalarValue::try_from_array(mins.as_ref(), i), + ScalarValue::try_from_array(maxes.as_ref(), i + 1), + ) { + (Ok(min_i), Ok(max_next)) => (max_next, min_i), + _ => return false, + } + } else { + // ASC order: max[i] <= min[i+1] + match ( + ScalarValue::try_from_array(maxes.as_ref(), i), + ScalarValue::try_from_array(mins.as_ref(), i + 1), + ) { + (Ok(max_i), Ok(min_next)) => (max_i, min_next), + _ => return false, + } + }; + if prev_max > next_min { + return false; // overlap detected + } + } + true +} + +fn find_dynamic_filter( + expr: &Arc, +) -> Option> { + // Try to downcast this expression directly. + // PhysicalExpr: Any + Send + Sync, so trait upcasting allows the coercion. + let cloned = Arc::clone(expr); + let any_arc: Arc = cloned; + if let Ok(df) = Arc::downcast::(any_arc) { + return Some(df); + } + + // Recursively check children + for child in expr.children() { + if let Some(df) = find_dynamic_filter(child) { + return Some(df); + } + } + + None +} + +/// Find a [`Column`] expression by unwrapping wrappers like `CastExpr`. +fn find_column_in_expr(expr: &Arc) -> Option { + // Direct Column + let any_ref: &dyn std::any::Any = expr.as_ref(); + if let Some(col) = any_ref.downcast_ref::() { + return Some(col.clone()); + } + // Unwrap single-child wrappers (e.g. CastExpr) + let children = expr.children(); + if children.len() == 1 { + return find_column_in_expr(children[0]); + } + None +} + /// State for a stream that decodes a single Parquet file using a push-based decoder. /// /// The [`transition`](Self::transition) method drives the decoder in a loop: it requests @@ -1634,16 +2075,19 @@ mod test { }; use datafusion_datasource::morsel::{Morsel, Morselizer}; use datafusion_datasource::{PartitionedFile, TableSchema}; - use datafusion_expr::{col, lit}; + use datafusion_expr::{Operator, col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, lit as physical_lit, + }, planner::logical2physical, projection::ProjectionExprs, }; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::StreamExt; use futures::stream::BoxStream; @@ -1675,6 +2119,7 @@ mod test { coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, } @@ -1701,6 +2146,7 @@ mod test { coerce_int96: None, max_predicate_cache_size: None, reverse_row_groups: false, + sort_order_for_reorder: None, preserve_order: false, } } @@ -1816,6 +2262,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder, } } } @@ -2720,4 +3167,54 @@ mod test { "without page index all rows are returned" ); } + + // --------------------------------------------------------------- + // Tests for find_dynamic_filter + // --------------------------------------------------------------- + + #[test] + fn test_find_dynamic_filter_direct_match() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], physical_lit(true)), + ); + + let result = find_dynamic_filter(&dynamic_filter); + assert!( + result.is_some(), + "should find direct DynamicFilterPhysicalExpr" + ); + } + + #[test] + fn test_find_dynamic_filter_nested_in_conjunction() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], physical_lit(true)), + ); + // Wrap it: (id > 0) AND dynamic_filter + let left: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Gt, + physical_lit(0i64), + )); + let conjunction: Arc = + Arc::new(BinaryExpr::new(left, Operator::And, dynamic_filter)); + + let result = find_dynamic_filter(&conjunction); + assert!( + result.is_some(), + "should find DynamicFilterPhysicalExpr nested in AND" + ); + } + + #[test] + fn test_find_dynamic_filter_none_when_absent() { + let col_expr: Arc = Arc::new(Column::new("id", 0)); + let result = find_dynamic_filter(&col_expr); + assert!( + result.is_none(), + "plain Column has no DynamicFilterPhysicalExpr" + ); + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..81059e975de1a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -55,7 +55,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use itertools::Itertools; use object_store::ObjectStore; #[cfg(feature = "parquet_encryption")] @@ -294,6 +294,8 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by min/max statistics. + sort_order_for_reorder: Option, } impl ParquetSource { @@ -319,6 +321,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + sort_order_for_reorder: None, } } @@ -482,6 +485,88 @@ impl ParquetSource { pub(crate) fn reverse_row_groups(&self) -> bool { self.reverse_row_groups } + + /// Extract TopK sort column name and direction from available sources. + /// + /// Tries two sources in order: + /// 1. DynamicFilterPhysicalExpr in the predicate (works for ALL TopK queries) + /// 2. sort_order_for_reorder (only set in Inexact sort pushdown path) + fn extract_topk_sort_info(&self) -> Option<(String, bool)> { + // Try 1: from predicate's DynamicFilterPhysicalExpr + if let Some(predicate) = &self.predicate + && let Some(info) = Self::sort_info_from_dynamic_filter(predicate) + { + return Some(info); + } + + // Try 2: fallback to sort_order_for_reorder (Inexact path) + if let Some(sort_order) = &self.sort_order_for_reorder + && !sort_order.is_empty() + { + let first = sort_order.first(); + if let Some(col) = first + .expr + .downcast_ref::() + { + return Some((col.name().to_string(), self.reverse_row_groups)); + } + } + + None + } + + /// Try to extract sort column name and direction from a DynamicFilterPhysicalExpr + /// in the predicate tree. + fn sort_info_from_dynamic_filter( + expr: &Arc, + ) -> Option<(String, bool)> { + // Try downcast to DynamicFilterPhysicalExpr + let cloned = Arc::clone(expr); + let any_arc: Arc = cloned; + if let Ok(df) = Arc::downcast::< + datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr, + >(any_arc) + { + let sort_options = df.sort_options()?; + if sort_options.is_empty() { + return None; + } + let children = df.children(); + if children.is_empty() { + return None; + } + let col = children[0] + .downcast_ref::()?; + return Some((col.name().to_string(), sort_options[0].descending)); + } + + // Recursively check children + for child in expr.children() { + if let Some(info) = Self::sort_info_from_dynamic_filter(child) { + return Some(info); + } + } + + None + } + + /// Extract the sort key from a file's statistics for reordering. + /// + /// For DESC sorts, returns the column's min_value (we want highest min first). + /// For ASC sorts, returns the column's max_value (we want lowest max first). + fn sort_key_for_file( + file: &datafusion_datasource::PartitionedFile, + col_idx: usize, + descending: bool, + ) -> Option { + let stats = file.statistics.as_ref()?; + let col_stats = stats.column_statistics.get(col_idx)?; + if descending { + col_stats.min_value.get_value().cloned() + } else { + col_stats.max_value.get_value().cloned() + } + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -580,9 +665,61 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), })) } + fn reorder_files( + &self, + mut files: Vec, + ) -> Vec { + // Extract sort column and direction from either: + // 1. The DynamicFilterPhysicalExpr in the predicate (works for ALL TopK) + // 2. Fallback to sort_order_for_reorder (Inexact path only) + let (col_name, descending) = match self.extract_topk_sort_info() { + Some(info) => info, + None => return files, + }; + + // Find the column index in the table schema + let table_schema = self.table_schema.table_schema(); + let col_idx = match table_schema.index_of(&col_name) { + Ok(idx) => idx, + Err(_) => return files, + }; + + // Stable sort: files with usable stats first, ordered by the + // relevant bound; files without stats go to the end. + files.sort_by(|a, b| { + let key_a = Self::sort_key_for_file(a, col_idx, descending); + let key_b = Self::sort_key_for_file(b, col_idx, descending); + match (key_a, key_b) { + (Some(va), Some(vb)) => { + if descending { + // DESC: highest min first (reverse order) + vb.partial_cmp(&va).unwrap_or(std::cmp::Ordering::Equal) + } else { + // ASC: lowest max first + va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal) + } + } + // Files without stats go to the end + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + } + }); + + log::debug!( + "Reordered {} files by {} {} for TopK optimization", + files.len(), + col_name, + if descending { "DESC" } else { "ASC" } + ); + + files + } + fn table_schema(&self) -> &TableSchema { &self.table_schema } @@ -821,7 +958,9 @@ impl FileSource for ParquetSource { // Return Inexact because we're only reversing row group order, // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); + let sort_order = LexOrdering::new(order.iter().cloned()); + let mut new_source = self.clone().with_reverse_row_groups(true); + new_source.sort_order_for_reorder = sort_order; Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9b4ae5827ae8b..32bee63b54f23 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -280,6 +280,19 @@ pub trait FileSource: Any + Send + Sync { Ok(SortOrderPushdownResult::Unsupported) } + /// Reorder files in the shared work queue to optimize query performance. + /// + /// For example, TopK queries benefit from reading files with the best + /// statistics first, so the dynamic filter threshold tightens quickly. + /// + /// The default implementation returns files unchanged (no reordering). + fn reorder_files( + &self, + files: Vec, + ) -> Vec { + files + } + /// Try to push down a projection into this FileSource. /// /// `FileSource` implementations that support projection pushdown should diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 7f31dacca9592..b874223a78879 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -85,8 +85,18 @@ impl SharedWorkSource { } /// Create a shared work source for the unopened files in `config`. + /// + /// Files are reordered by the file source (e.g. by statistics for TopK) + /// before being placed in the shared queue. pub(crate) fn from_config(config: &FileScanConfig) -> Self { - Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) + let files: Vec<_> = config + .file_groups + .iter() + .flat_map(FileGroup::iter) + .cloned() + .collect(); + let files = config.file_source.reorder_files(files); + Self::new(files) } /// Pop the next file from the shared work queue. diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..82e69e88f116b 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::SortOptions; use parking_lot::RwLock; use std::{fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; @@ -74,6 +75,14 @@ pub struct DynamicFilterPhysicalExpr { /// But this can have overhead in production, so it's only included in our tests. data_type: Arc>>, nullable: Arc>>, + /// Optional sort options for each child expression. + /// When set (e.g., by SortExec for TopK), downstream consumers like the + /// parquet reader can use these to initialize the filter threshold from + /// column statistics before reading any data. + sort_options: Option>, + /// Optional TopK fetch limit (K in LIMIT K). + /// Used by the parquet reader for cumulative RG pruning after reorder. + fetch: Option, } #[derive(Debug)] @@ -177,9 +186,38 @@ impl DynamicFilterPhysicalExpr { state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), + sort_options: None, + fetch: None, } } + /// Create a new [`DynamicFilterPhysicalExpr`] with sort options. + /// + /// Sort options indicate the sort direction for each child expression, + /// enabling downstream consumers (e.g., parquet readers) to initialize + /// the filter threshold from column statistics before reading data. + pub fn new_with_sort_options( + children: Vec>, + inner: Arc, + sort_options: Vec, + fetch: Option, + ) -> Self { + let mut this = Self::new(children, inner); + this.sort_options = Some(sort_options); + this.fetch = fetch; + this + } + + /// Returns the sort options for each child expression, if available. + pub fn sort_options(&self) -> Option<&[SortOptions]> { + self.sort_options.as_deref() + } + + /// Returns the TopK fetch limit (K), if available. + pub fn fetch(&self) -> Option { + self.fetch + } + fn remap_children( children: &[Arc], remapped_children: Option<&Vec>>, @@ -368,6 +406,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), + sort_options: self.sort_options.clone(), + fetch: self.fetch, })) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..eb1b7bbdd739e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -915,8 +915,18 @@ impl SortExec { .iter() .map(|sort_expr| Arc::clone(&sort_expr.expr)) .collect::>(); + let sort_options = self + .expr + .iter() + .map(|sort_expr| sort_expr.options) + .collect::>(); Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(children, lit(true)), + DynamicFilterPhysicalExpr::new_with_sort_options( + children, + lit(true), + sort_options, + self.fetch, + ), )))) } @@ -952,14 +962,16 @@ impl SortExec { if fetch.is_some() && is_pipeline_friendly { cache = cache.with_boundedness(Boundedness::Bounded); } - let filter = fetch.is_some().then(|| { - // If we already have a filter, keep it. Otherwise, create a new one. - self.filter.clone().unwrap_or_else(|| self.create_filter()) - }); let mut new_sort = self.cloned(); new_sort.fetch = fetch; new_sort.cache = cache.into(); - new_sort.filter = filter; + new_sort.filter = fetch.is_some().then(|| { + // If we already have a filter, keep it. Otherwise, create a new one. + // Must be called after setting fetch so DynamicFilter gets the K value. + self.filter + .clone() + .unwrap_or_else(|| new_sort.create_filter()) + }); new_sort } diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 561c1a9dcbc87..4b59e30762610 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -103,7 +103,7 @@ LOCATION 'test_files/scratch/explain_analyze/output_rows_skew'; query TT EXPLAIN ANALYZE SELECT * FROM skew_parquet; ---- -Plan with Metrics DataSourceExec: output_rows_skew=84.31% +Plan with Metrics DataSourceExec: output_rows_skew= # All partition's output_rows: [0] statement ok diff --git a/datafusion/sqllogictest/test_files/listing_table_partitions.slt b/datafusion/sqllogictest/test_files/listing_table_partitions.slt index 5df78b674fe8d..52433429cfe80 100644 --- a/datafusion/sqllogictest/test_files/listing_table_partitions.slt +++ b/datafusion/sqllogictest/test_files/listing_table_partitions.slt @@ -73,70 +73,3 @@ foo statement count 0 set datafusion.execution.listing_table_factory_infer_partitions = true; - -# Test: files outside partition structure are skipped -# This simulates a table that transitioned from non-partitioned to -# hive-partitioned storage, leaving a stale file in the root directory. - -# Create partitioned files first -query I -copy (values(1, 'alice'), (2, 'bob')) -to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2024/data.parquet'; ----- -2 - -query I -copy (values(3, 'charlie')) -to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2025/data.parquet'; ----- -1 - -# Create the table before adding the stale root file, so partition -# inference succeeds (it only runs at CREATE TABLE time). -statement count 0 -create external table root_file_test -stored as parquet -location 'test_files/scratch/listing_table_partitions/root_file_skipped/'; - -# Now add a stale root-level file (outside any partition directory). -# This simulates a file left over from before partitioning was added. -query I -copy (values(99, 'stale')) -to 'test_files/scratch/listing_table_partitions/root_file_skipped/stale.parquet'; ----- -1 - -# The root file should be skipped — only partitioned files are included -query IT -select column1, column2 from root_file_test order by column1; ----- -1 alice -2 bob -3 charlie - -# Partition column should be accessible -query ITT -select column1, column2, year from root_file_test order by column1; ----- -1 alice 2024 -2 bob 2024 -3 charlie 2025 - -# Partition filter should work -query ITT -select column1, column2, year from root_file_test where year = '2025'; ----- -3 charlie 2025 - -# COUNT should not include the root file's rows -query I -select count(*) from root_file_test; ----- -3 - -# GROUP BY partition column should work -query TI -select year, count(*) from root_file_test group by year order by year; ----- -2024 2 -2025 1 diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..a968174a95b1a 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,589 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# =========================================================== +# Test H: Row group reorder by statistics for TopK queries. +# When a file has multiple row groups with overlapping or +# out-of-order statistics, sort pushdown returns Inexact and +# `reorder_by_statistics` reorders row groups within the file +# so TopK finds optimal values first. +# =========================================================== + +# Create a table with 30 rows and write to parquet with small row groups +# so we get multiple row groups per file. Rows are inserted in a mixed +# order so row groups span overlapping ranges (forcing Inexact path). +statement ok +CREATE TABLE th_mixed(id INT, value INT) AS VALUES + (15, 150), (5, 50), (25, 250), + (10, 100), (20, 200), (1, 10), + (30, 300), (3, 30), (18, 180); + +# Write with row_group_size=3 → 3 rows per RG, 3 RGs total +# RG statistics (unsorted order): RG0(5-25), RG1(1-20), RG2(3-30) +# Note: files are overlapping → Inexact path → TopK retained +query I +COPY (SELECT * FROM th_mixed) +TO 'test_files/scratch/sort_pushdown/th_reorder/data.parquet' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' '3'); +---- +9 + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE th_reorder(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; + +# Test H.1: ASC ORDER BY with LIMIT — Inexact path (file has no declared ordering) +# Plan: SortExec(TopK) preserved. RG reorder happens inside DataSourceExec +# (not visible in EXPLAIN, but verified by unit tests in access_plan.rs). +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id ASC NULLS LAST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Results must be correct regardless of RG reorder. +query II +SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +query II +SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +30 300 +25 250 +20 200 + +# Test H.3: Full sort (no LIMIT) — output must still be correctly sorted +query II +SELECT * FROM th_reorder ORDER BY id ASC; +---- +1 10 +3 30 +5 50 +10 100 +15 150 +18 180 +20 200 +25 250 +30 300 + +# Test H.4: ORDER BY expression (not a simple column) — reorder should +# gracefully skip, results still correct +query II +SELECT id, value FROM th_reorder ORDER BY id + 1 ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Cleanup Test H +statement ok +DROP TABLE th_mixed; + +statement ok +DROP TABLE th_reorder; + +# =========================================================== +# Test I: WITH ORDER + DESC LIMIT — stats init + cumulative prune +# Tests the full optimization chain on sorted non-overlapping data: +# file reorder → RG reorder → reverse → stats init → cumulative prune +# =========================================================== + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Create sorted data with multiple small row groups +statement ok +CREATE TABLE ti_data(id INT, value INT) AS VALUES +(1,10),(2,20),(3,30),(4,40),(5,50),(6,60), +(7,70),(8,80),(9,90),(10,100),(11,110),(12,120); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +query I +COPY (SELECT * FROM ti_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet'; +---- +12 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE ti_sorted(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet' +WITH ORDER (id ASC); + +# Test I.1: DESC LIMIT with sort pushdown — EXPLAIN shows reverse_row_groups=true +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id DESC NULLS FIRST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Test I.2: DESC LIMIT results — should return largest values +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +12 120 +11 110 +10 100 + +# Test I.3: ASC LIMIT (same direction as file order) — sort elimination, no TopK +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id ASC NULLS LAST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +1 10 +2 20 +3 30 + +# Test I.4: DESC LIMIT with WHERE — cumulative prune works, stats init skipped +query II +SELECT * FROM ti_sorted WHERE value > 50 ORDER BY id DESC LIMIT 2; +---- +12 120 +11 110 + +# Test I.5: Larger LIMIT spanning multiple RGs (4 RGs of 3 rows each) +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 8; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 + +# Test I.6: LIMIT larger than total rows — returns all rows +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 100; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 +4 40 +3 30 +2 20 +1 10 + +# Cleanup Test I +statement ok +DROP TABLE ti_data; + +statement ok +DROP TABLE ti_sorted; + +# =========================================================== +# Test J: Non-overlapping RGs without WITH ORDER — +# RG reorder + cumulative prune via DynamicFilter sort_options +# =========================================================== + +# Create data written in scrambled order but with non-overlapping RGs +statement ok +CREATE TABLE tj_high(id INT, value INT) AS VALUES (10,100),(11,110),(12,120); + +statement ok +CREATE TABLE tj_mid(id INT, value INT) AS VALUES (4,40),(5,50),(6,60); + +statement ok +CREATE TABLE tj_low(id INT, value INT) AS VALUES (1,10),(2,20),(3,30); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +# Write in scrambled order: high, low, mid → RGs are non-overlapping but not sorted +query I +COPY ( + SELECT * FROM tj_high + UNION ALL SELECT * FROM tj_low + UNION ALL SELECT * FROM tj_mid +) +TO 'test_files/scratch/sort_pushdown/tj_scrambled/data.parquet'; +---- +9 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +# No WITH ORDER — tests non-sort-pushdown TopK path +statement ok +CREATE EXTERNAL TABLE tj_scrambled(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tj_scrambled/data.parquet'; + +# Test J.1: DESC LIMIT — RG reorder + reverse from DynamicFilter +# EXPLAIN shows DynamicFilter but NO reverse_row_groups (not sort pushdown) +query TT +EXPLAIN SELECT * FROM tj_scrambled ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: tj_scrambled.id DESC NULLS FIRST, fetch=3 +02)--TableScan: tj_scrambled projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tj_scrambled/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Test J.2: Results must be correct +query II +SELECT * FROM tj_scrambled ORDER BY id DESC LIMIT 3; +---- +12 120 +11 110 +10 100 + +# Test J.3: ASC LIMIT +query II +SELECT * FROM tj_scrambled ORDER BY id ASC LIMIT 3; +---- +1 10 +2 20 +3 30 + +# Test J.4: DESC with WHERE +query II +SELECT * FROM tj_scrambled WHERE value >= 50 ORDER BY id DESC LIMIT 2; +---- +12 120 +11 110 + +# Test J.5: Full sort without LIMIT (no TopK, no reorder) +query II +SELECT * FROM tj_scrambled ORDER BY id ASC; +---- +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +10 100 +11 110 +12 120 + +# Cleanup Test J +statement ok +DROP TABLE tj_high; + +statement ok +DROP TABLE tj_mid; + +statement ok +DROP TABLE tj_low; + +statement ok +DROP TABLE tj_scrambled; + +# =========================================================== +# Test K: Overlapping RGs without WITH ORDER — +# cumulative prune must NOT trigger (overlapping = unsafe) +# =========================================================== + +# Create data with overlapping ranges +statement ok +CREATE TABLE tk_data(id INT, value INT) AS VALUES +(1,10),(5,50),(9,90), +(3,30),(7,70),(11,110), +(2,20),(6,60),(10,100); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +query I +COPY (SELECT * FROM tk_data) +TO 'test_files/scratch/sort_pushdown/tk_overlap/data.parquet'; +---- +9 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE tk_overlap(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tk_overlap/data.parquet'; + +# Test K.1: DESC LIMIT on overlapping RGs — results must be correct +# (cumulative prune should not trigger due to overlap) +query II +SELECT * FROM tk_overlap ORDER BY id DESC LIMIT 3; +---- +11 110 +10 100 +9 90 + +# Test K.2: ASC LIMIT on overlapping RGs +query II +SELECT * FROM tk_overlap ORDER BY id ASC LIMIT 3; +---- +1 10 +2 20 +3 30 + +# Test K.3: Full sort (verify all data present) +query II +SELECT * FROM tk_overlap ORDER BY id ASC; +---- +1 10 +2 20 +3 30 +5 50 +6 60 +7 70 +9 90 +10 100 +11 110 + +# Cleanup Test K +statement ok +DROP TABLE tk_data; + +statement ok +DROP TABLE tk_overlap; + +# =========================================================== +# Test L: Multi-key ORDER BY — uses first key for RG reorder +# =========================================================== + +statement ok +CREATE TABLE tl_data(id INT, name VARCHAR, value INT) AS VALUES +(10, 'a', 100), (11, 'b', 110), (12, 'c', 120), +(4, 'd', 40), (5, 'e', 50), (6, 'f', 60), +(1, 'g', 10), (2, 'h', 20), (3, 'i', 30); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +query I +COPY (SELECT * FROM tl_data) +TO 'test_files/scratch/sort_pushdown/tl_multikey/data.parquet'; +---- +9 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE tl_multikey(id INT, name VARCHAR, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tl_multikey/data.parquet'; + +# Test L.1: Multi-key DESC — first key determines RG reorder +query IIT +SELECT id, value, name FROM tl_multikey ORDER BY id DESC, name ASC LIMIT 3; +---- +12 120 c +11 110 b +10 100 a + +# Test L.2: Multi-key ASC +query IIT +SELECT id, value, name FROM tl_multikey ORDER BY id ASC, name DESC LIMIT 3; +---- +1 10 g +2 20 h +3 30 i + +# Test L.3: Multi-key with 3 sort columns +query IIT +SELECT id, value, name FROM tl_multikey ORDER BY id DESC, value DESC, name ASC LIMIT 5; +---- +12 120 c +11 110 b +10 100 a +6 60 f +5 50 e + +# Test L.4: WITH ORDER + multi-key DESC LIMIT +statement ok +CREATE EXTERNAL TABLE tl_sorted(id INT, name VARCHAR, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tl_multikey/data.parquet' +WITH ORDER (id ASC); + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Multi-key with sort pushdown — first key triggers reverse +query TT +EXPLAIN SELECT id, value FROM tl_sorted ORDER BY id DESC, value ASC LIMIT 3; +---- +logical_plan +01)Sort: tl_sorted.id DESC NULLS FIRST, tl_sorted.value ASC NULLS LAST, fetch=3 +02)--TableScan: tl_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC, value@1 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tl_multikey/data.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] + +query II +SELECT id, value FROM tl_sorted ORDER BY id DESC, value ASC LIMIT 3; +---- +12 120 +11 110 +10 100 + +# Cleanup Test L +statement ok +DROP TABLE tl_data; + +statement ok +DROP TABLE tl_multikey; + +statement ok +DROP TABLE tl_sorted; + +# =========================================================== +# Test M: Multi-key Inexact sort pushdown (WITH ORDER multi-key) +# File declared WITH ORDER (id ASC, value ASC), query ORDER BY +# id DESC, value DESC → reversed matches → Inexact path. +# =========================================================== + +statement ok +CREATE TABLE tm_data(id INT, value INT) AS VALUES +(1, 10), (1, 20), (1, 30), +(2, 40), (2, 50), (2, 60), +(3, 70), (3, 80), (3, 90), +(4, 100), (4, 110), (4, 120); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +# Write sorted by (id ASC, value ASC) +query I +COPY (SELECT * FROM tm_data ORDER BY id ASC, value ASC) +TO 'test_files/scratch/sort_pushdown/tm_multikey_sorted/data.parquet'; +---- +12 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE tm_sorted(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tm_multikey_sorted/data.parquet' +WITH ORDER (id ASC, value ASC); + +# Test M.1: Multi-key DESC that fully matches reversed ordering +# ORDER BY id DESC, value DESC matches reversed (id ASC, value ASC) → Inexact +query TT +EXPLAIN SELECT * FROM tm_sorted ORDER BY id DESC, value DESC LIMIT 3; +---- +logical_plan +01)Sort: tm_sorted.id DESC NULLS FIRST, tm_sorted.value DESC NULLS FIRST, fetch=3 +02)--TableScan: tm_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC, value@1 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tm_multikey_sorted/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Test M.2: Results must be correct (largest id first, then largest value) +query II +SELECT * FROM tm_sorted ORDER BY id DESC, value DESC LIMIT 3; +---- +4 120 +4 110 +4 100 + +# Test M.3: Larger limit spanning multiple RGs +query II +SELECT * FROM tm_sorted ORDER BY id DESC, value DESC LIMIT 6; +---- +4 120 +4 110 +4 100 +3 90 +3 80 +3 70 + +# Test M.4: Multi-key ASC (same direction as file = Exact, sort elimination) +query II +SELECT * FROM tm_sorted ORDER BY id ASC, value ASC LIMIT 3; +---- +1 10 +1 20 +1 30 + +# Test M.5: Partial match — first key reversed, second key SAME direction +# ORDER BY id DESC, value ASC does NOT match reversed (id DESC, value DESC) +# → NOT Inexact path (second key mismatch) +query II +SELECT * FROM tm_sorted ORDER BY id DESC, value ASC LIMIT 3; +---- +4 100 +4 110 +4 120 + +# Test M.6: All rows, verify full data integrity +query II +SELECT * FROM tm_sorted ORDER BY id DESC, value DESC; +---- +4 120 +4 110 +4 100 +3 90 +3 80 +3 70 +2 60 +2 50 +2 40 +1 30 +1 20 +1 10 + +# Cleanup Test M +statement ok +DROP TABLE tm_data; + +statement ok +DROP TABLE tm_sorted; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4;