-
Notifications
You must be signed in to change notification settings - Fork 2k
perf: Skip RowFilter when all predicate columns are in the projection #20417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
darmie
wants to merge
8
commits into
apache:main
Choose a base branch
from
darmie:fix-parquet-filter-pushdown
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
b42ef35
Skip RowFilter pushdown when filter columns are already projected
darmie ee954ad
Add test for batch filter path and simplify stream wrapping
darmie d7ff890
Merge branch 'main' into fix-parquet-filter-pushdown
darmie 77f6315
Refine batch filter guard: count only static conjuncts
darmie ee47492
Tighten batch filter guard to exact column match
darmie 21d1686
Per-conjunct RowFilter demotion in build_row_filter()
darmie 700abfb
Add cfg guards for force_hash_collisions in hash_utils
darmie fd21f30
Keep conjuncts in RowFilter when filter cols are not projected
darmie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,9 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; | |
| use arrow::datatypes::DataType; | ||
| use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; | ||
| use datafusion_physical_expr::projection::ProjectionExprs; | ||
| use datafusion_physical_expr::utils::reassign_expr_columns; | ||
| use datafusion_physical_expr::utils::{conjunction_opt, reassign_expr_columns}; | ||
| use datafusion_physical_expr_adapter::replace_columns_with_literals; | ||
| use std::collections::HashMap; | ||
| use std::collections::{HashMap, HashSet}; | ||
| use std::pin::Pin; | ||
| use std::sync::Arc; | ||
| use std::task::{Context, Poll}; | ||
|
|
@@ -46,6 +46,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; | |
| use datafusion_physical_expr_common::physical_expr::{ | ||
| PhysicalExpr, is_dynamic_physical_expr, | ||
| }; | ||
| use datafusion_physical_plan::filter::batch_filter; | ||
| use datafusion_physical_plan::metrics::{ | ||
| Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, | ||
| }; | ||
|
|
@@ -459,27 +460,37 @@ impl FileOpener for ParquetOpener { | |
| // `row_filter` for details. | ||
| // --------------------------------------------------------------------- | ||
|
|
||
| // Filter pushdown: evaluate predicates during scan | ||
| if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { | ||
| let row_filter = row_filter::build_row_filter( | ||
| // Filter pushdown: evaluate predicates during scan. | ||
| // | ||
| // Each conjunct is evaluated individually inside | ||
| // `build_row_filter`: conjuncts whose required columns leave | ||
| // extra projected columns unread benefit from late | ||
| // materialization and stay in the RowFilter; conjuncts that | ||
| // reference all projected columns are demoted to batch-level | ||
| // filtering to avoid the overhead of the RowFilter machinery. | ||
| let batch_filter_predicate = if let Some(predicate) = | ||
| pushdown_filters.then_some(predicate).flatten() | ||
| { | ||
| let projection_col_indices: HashSet<usize> = | ||
| projection.column_indices().into_iter().collect(); | ||
|
|
||
| let (row_filter, demoted) = row_filter::build_row_filter( | ||
| &predicate, | ||
| &physical_file_schema, | ||
| builder.metadata(), | ||
| reorder_predicates, | ||
| &file_metrics, | ||
| ); | ||
| &projection_col_indices, | ||
| )?; | ||
|
|
||
| match row_filter { | ||
| Ok(Some(filter)) => { | ||
| builder = builder.with_row_filter(filter); | ||
| } | ||
| Ok(None) => {} | ||
| Err(e) => { | ||
| debug!( | ||
| "Ignoring error building row filter for '{predicate:?}': {e}" | ||
| ); | ||
| } | ||
| }; | ||
| if let Some(filter) = row_filter { | ||
| builder = builder.with_row_filter(filter); | ||
| } | ||
|
|
||
| // Combine demoted conjuncts into a single batch filter | ||
| conjunction_opt(demoted) | ||
| } else { | ||
| None | ||
| }; | ||
| if force_filter_selections { | ||
| builder = | ||
|
|
@@ -627,6 +638,12 @@ impl FileOpener for ParquetOpener { | |
| let projection = projection | ||
| .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; | ||
|
|
||
| // Also remap the batch filter predicate to the stream schema | ||
| let batch_filter_predicate = batch_filter_predicate | ||
| .map(|pred| reassign_expr_columns(pred, &stream_schema)) | ||
| .transpose()?; | ||
| let has_batch_filter = batch_filter_predicate.is_some(); | ||
|
|
||
| let projector = projection.make_projector(&stream_schema)?; | ||
|
|
||
| let stream = stream.map_err(DataFusionError::from).map(move |b| { | ||
|
|
@@ -636,6 +653,10 @@ impl FileOpener for ParquetOpener { | |
| &predicate_cache_inner_records, | ||
| &predicate_cache_records, | ||
| ); | ||
| // Apply batch-level filter when RowFilter pushdown was skipped | ||
| if let Some(ref filter_pred) = batch_filter_predicate { | ||
| b = batch_filter(&b, filter_pred)?; | ||
| } | ||
| b = projector.project_batch(&b)?; | ||
| if replace_schema { | ||
| // Ensure the output batch has the expected schema. | ||
|
|
@@ -664,6 +685,18 @@ impl FileOpener for ParquetOpener { | |
| // ---------------------------------------------------------------------- | ||
| // Step: wrap the stream so a dynamic filter can stop the file scan early | ||
| // ---------------------------------------------------------------------- | ||
|
|
||
| // When batch-level filtering is active (RowFilter pushdown was | ||
| // skipped), filter out empty batches that result from the predicate | ||
| // removing all rows in a decoded batch. | ||
| let stream = if has_batch_filter { | ||
| stream | ||
| .try_filter(|batch| std::future::ready(batch.num_rows() > 0)) | ||
| .boxed() | ||
| } else { | ||
| stream.boxed() | ||
| }; | ||
|
|
||
| if let Some(file_pruner) = file_pruner { | ||
| Ok(EarlyStoppingStream::new( | ||
| stream, | ||
|
|
@@ -672,7 +705,7 @@ impl FileOpener for ParquetOpener { | |
| ) | ||
| .boxed()) | ||
| } else { | ||
| Ok(stream.boxed()) | ||
| Ok(stream) | ||
| } | ||
| })) | ||
| } | ||
|
|
@@ -1025,10 +1058,10 @@ mod test { | |
| stats::Precision, | ||
| }; | ||
| use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; | ||
| use datafusion_expr::{col, lit}; | ||
| use datafusion_expr::{Operator, col, lit}; | ||
| use datafusion_physical_expr::{ | ||
| PhysicalExpr, | ||
| expressions::{Column, DynamicFilterPhysicalExpr, Literal}, | ||
| expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal}, | ||
| planner::logical2physical, | ||
| projection::ProjectionExprs, | ||
| }; | ||
|
|
@@ -2004,4 +2037,169 @@ mod test { | |
| "Reverse scan with non-contiguous row groups should correctly map RowSelection" | ||
| ); | ||
| } | ||
|
|
||
| /// Per-conjunct RowFilter demotion: when a conjunct's required columns | ||
| /// cover all projected columns, it provides no column-decode savings | ||
| /// and is demoted to batch-level filtering. Conjuncts with extra | ||
| /// projected columns stay in the RowFilter for late materialization. | ||
| #[tokio::test] | ||
| async fn test_skip_row_filter_when_filter_cols_subset_of_projection() { | ||
| let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; | ||
|
|
||
| // 4 rows: a=[1,2,2,4], b=[10,20,30,40] | ||
| let batch = record_batch!( | ||
| ("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]), | ||
| ("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)]) | ||
| ) | ||
| .unwrap(); | ||
| let data_size = | ||
| write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; | ||
| let schema = batch.schema(); | ||
| let file = PartitionedFile::new( | ||
| "test.parquet".to_string(), | ||
| u64::try_from(data_size).unwrap(), | ||
| ); | ||
|
|
||
| // Case 1: filter_cols == projection_cols → batch filter path | ||
| // Filter: a = 2, Projection: [a] | ||
| // Conjunct cols = {0}, projection = {0} → no extra cols to skip | ||
| // decoding → demoted to batch filter. | ||
| let expr = col("a").eq(lit(2)); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like this test should also be asserting something about the predicates more directly (this is asserting the number of rows that comes out, rather than the fact that filter is pushed down) |
||
| let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
| assert_eq!(num_rows, 2, "batch filter should return 2 matching rows"); | ||
| assert_eq!(num_batches, 1); | ||
|
|
||
| // Case 1b: filter_cols ⊂ projection_cols → RowFilter path | ||
| // Filter: a = 2, Projection: [a, b] | ||
| // Conjunct cols = {0}, projection = {0, 1} → extra col b → RowFilter | ||
| // skips decoding column b for non-matching rows. | ||
| let expr = col("a").eq(lit(2)); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0, 1]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
| let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
| assert_eq!(num_rows, 2, "RowFilter should return 2 matching rows"); | ||
| assert_eq!(num_batches, 1); | ||
|
|
||
| // Case 2: filter_cols ⊄ projection_cols → RowFilter path | ||
| // Filter: b = 20, Projection: [a] (only column a) | ||
| // Conjunct cols = {1}, projection = {0} → extra col a → RowFilter | ||
| let expr = col("b").eq(lit(20)); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
| let values = collect_int32_values(stream).await; | ||
| assert_eq!( | ||
| values, | ||
| vec![2], | ||
| "RowFilter should return correct filtered values" | ||
| ); | ||
|
|
||
| // Case 3: no matches → 0 rows via batch filter | ||
| // Filter: a = 99, Projection: [a] | ||
| // Conjunct cols = {0}, projection = {0} → no extra cols → batch filter | ||
| let expr = col("a").eq(lit(99)); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
| let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
| assert_eq!(num_rows, 0, "no rows should match"); | ||
| assert_eq!(num_batches, 0, "empty batches should be filtered out"); | ||
|
|
||
| // Case 4: verify correct values in batch filter path | ||
| // Filter: a = 2, Projection: [a] | ||
| let expr = col("a").eq(lit(2)); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
| let values = collect_int32_values(stream).await; | ||
| assert_eq!( | ||
| values, | ||
| vec![2, 2], | ||
| "batch filter should return correct values" | ||
| ); | ||
|
|
||
| // Case 5: multi-conjunct predicate → RowFilter path | ||
| // Filter: a = 2 AND b = 20, Projection: [a, b] | ||
| // Per-conjunct: `a = 2` has extra col b, `b = 20` has extra col a | ||
| // → both kept in RowFilter for incremental evaluation. | ||
| let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20))); | ||
| let predicate = logical2physical(&expr, &schema); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0, 1]) | ||
| .with_predicate(predicate) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file.clone()).unwrap().await.unwrap(); | ||
| let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
| assert_eq!(num_rows, 1, "multi-conjunct RowFilter should return 1 row"); | ||
| assert_eq!(num_batches, 1); | ||
|
|
||
| // Case 6: single static conjunct + dynamic filter → batch filter path | ||
| // Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N` | ||
| // Predicate: `a = 2 AND <dynamic>(a < 3)`, Projection: [a] | ||
| // Per-conjunct: both conjuncts reference only col a = projection | ||
| // → no extra cols → all demoted to batch filter. | ||
| let static_expr = logical2physical(&col("a").eq(lit(2)), &schema); | ||
| let dynamic_expr = | ||
| make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema)); | ||
| let combined: Arc<dyn PhysicalExpr> = | ||
| Arc::new(BinaryExpr::new(static_expr, Operator::And, dynamic_expr)); | ||
| let opener = ParquetOpenerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_predicate(combined) | ||
| .with_pushdown_filters(true) | ||
| .with_reorder_filters(true) | ||
| .build(); | ||
| let stream = opener.open(file).unwrap().await.unwrap(); | ||
| let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
| assert_eq!( | ||
| num_rows, 2, | ||
| "single static conjunct + dynamic filter should use batch filter" | ||
| ); | ||
| assert_eq!(num_batches, 1); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are deciding what filters to push down based on projection and filter columns, is the ParquetOpener the right place? I wonder if we should move the determiniation earlier (like maybe don't bother to try and push down filters at all ?)