(Test) Advanced adaptive filter selectivity evaluation#20363
(Test) Advanced adaptive filter selectivity evaluation#20363adriangb wants to merge 14 commits intoapache:mainfrom
Conversation
|
run benchmark tpcds |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
Hm it seems stuck again |
|
FYI @alamb
|
|
@Dandandan this is mostly vibe coded, I'm only 50% confident it even makes sense without reviewing the code fwiw |
e0240af to
09cdb0b
Compare
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
Wonder if I'm infinite looping it or something :( |
Yes I think previously it got stuck during infinite loops / extremely long running tasks. |
My bad I’ll try to add a PR to have timeouts and a cancel command |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
run benchmark tpch |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
1 similar comment
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
It does seem to have gotten stuck again. I’m working on a system that can run benchmarks in parallel and won’t get borked like this. I think it’s almost ready. |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
@Dandandan i deleted your comment to stop the spam. I honestly don’t know what’s wrong with this PR or the runner. I think I should close it and open a new one. |
Thanks... Perhaps it has to do with the comment / commit window? |
|
🤖 |
|
I think this PR gets OOM killed -- I'll remove it |
Thanks. Sorry for triggering it again. It’s hard to debug what’s going on and I’m surprised this PR causes an OOM when others don’t, but it does seem to be especially problematic. Wonder if it has a mem leak or something. |
For context - I tried to run the Clickbench benchmark on a Claude Cloud runner (21 GB RAM / 16 cores) - there it also get stuck (without any changes). |
…tion Replace the single RwLock<SelectivityTrackerInner> guarding all state with two independent locks: - filter_stats: RwLock<HashMap<FilterId, Mutex<SelectivityStats>>> The hot update() path takes a shared read lock then a per-filter Mutex. Different filters never contend; same-filter contention is ~100ns on the cheap inner Mutex. - inner: Mutex<SelectivityTrackerInner> The cold partition_filters() path (once per file open) takes this for state-machine transitions. update() never touches it. Measured on ClickBench Q10 (24 threads, 100 partitioned files): partition_filters() lock acquire: 313µs avg → ~120ns (2600x faster) update() waits >10µs: 265 calls → 2-4 calls (99% reduction) Total cumulative lock wait: ~50ms → <0.1ms Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Which issue does this PR close?
Related to filter pushdown performance optimization work.
Rationale for this change
Currently when
pushdown_filters = true, DataFusion pushes all filter predicates into the Parquet reader as row-level filters (ArrowPredicates) unconditionally. This is suboptimal because:reorder_filtersheuristic was static. It used compressed column size as a proxy for cost and sorted filters by that metric, but never measured actual runtime selectivity or evaluation cost. It could not adapt to data skew or runtime conditions.HashJoinExec) cannot be dropped even when they provide no benefit. Without a way to mark filters as optional, the system was forced to always evaluate them.This PR introduces an adaptive filter selectivity tracking system that observes filter behavior at runtime and makes data-driven decisions about whether each filter should be pushed down as a row-level predicate or applied post-scan.
What changes are included in this PR?
1. New module:
selectivity.rs(1,554 lines)The core of this PR. Introduces
SelectivityTracker, a shared, lock-guarded structure that:New -> RowFilter | PostScan -> (promoted/demoted/dropped)states based on:filter_bytes / projection_bytes) to cheaply decide whether a new filter starts as a row filter or post-scan filter.filter_pushdown_min_bytes_per_sec.OptionalFilterPhysicalExprcan be dropped entirely when ineffective.snapshot_generation(), resetting statistics when a filter's predicate changes (e.g., when aDynamicFilterPhysicalExprfrom a hash join updates its value set).Key types:
SelectivityTracker-- cross-file tracker shared by allParquetOpenerinstancesTrackerConfig-- immutable configuration (built fromParquetOptions)SelectivityStats-- per-filter Welford statistics with confidence interval methodsFilterState--RowFilter | PostScan | DroppedenumPartitionedFilters-- output ofpartition_filters(), consumed by the openerFilterId-- stableusizeidentifier assigned byParquetSource::with_predicate2. New wrapper:
OptionalFilterPhysicalExpr(inphysical_expr_common)A transparent
PhysicalExprwrapper that marks a filter as optional -- droppable without affecting query correctness. AllPhysicalExprtrait methods delegate to the inner expression. The selectivity tracker detects this viadowncast_ref::<OptionalFilterPhysicalExpr>()and can drop the filter entirely when it is ineffective, rather than demoting it to post-scan.HashJoinExecnow wraps its dynamic join filters inOptionalFilterPhysicalExprbefore pushing them down. This is why plan output now showsOptional(DynamicFilter [...])instead ofDynamicFilter [...].3. Removal of
reorder_filtersconfig optionThe old static
reorder_filtersboolean and its associated heuristic (sort byrequired_bytes, thencan_use_index) are removed entirely. The adaptive system subsumes this:FilterCandidateno longer storesrequired_bytesorcan_use_indexfields.size_of_columns()andcolumns_sorted()helper functions inrow_filter.rsare removed.SelectivityTracker::partition_filters()based on measured effectiveness or byte-ratio fallback.4. Three new configuration options (in
ParquetOptions)filter_pushdown_min_bytes_per_sec0.0= all promoted,INFINITY= none promoted (feature disabled).filter_collecting_byte_ratio_thresholdfilter_confidence_z5. Changes to
ParquetOpener/ opener.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>instead of a single combinedArc<dyn PhysicalExpr>.selectivity_tracker.partition_filters()to split filters into row-level vs. post-scan.build_row_filter()(updated signature).apply_post_scan_filters_with_stats(), a new function that evaluates each filter individually, reports per-filter timing and selectivity back to the tracker, and combines results into a single boolean mask.limitis only applied to the Parquet reader when there are no post-scan filters (otherwise limiting would cut off rows before the filter could find matches).filter_apply_timemetric tracks post-scan filter evaluation time.6. Changes to
ParquetSource/ source.rsOption<Arc<dyn PhysicalExpr>>toOption<Vec<(FilterId, Arc<dyn PhysicalExpr>)>>.with_predicate()now splits the predicate into conjuncts and assigns stableFilterIds (indices).SelectivityTrackeris stored as a sharedArconParquetSourceand passed to all openers.with_table_parquet_options()now builds a freshSelectivityTrackerfrom the three new config values.with_reorder_filters()andreorder_filters()methods are removed.7. Changes to
build_row_filter()/ row_filter.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>+&Arc<SelectivityTracker>instead of&Arc<dyn PhysicalExpr>+reorder_predicates: bool.RowFilterWithMetrics(new struct) containing both theRowFilterand any unbuildable filters that must be applied post-scan.DatafusionArrowPredicatenow carries aFilterIdandArc<SelectivityTracker>, reporting per-batch evaluation metrics back to the tracker after eachevaluate()call.build_row_filter-- filters arrive pre-ordered by the tracker.8. Changes to
HashJoinExecOptionalFilterPhysicalExprbefore being pushed down.OptionalFilterPhysicalExprto find the innerDynamicFilterPhysicalExpr.9. Protobuf schema updates
reorder_filtersfield (tag 6) marked asreservedindatafusion_common.proto.filter_pushdown_min_bytes_per_sec(tag 35),filter_collecting_byte_ratio_threshold(tag 40),filter_confidence_z(tag 41).pbjson.rs,prost.rs,from_proto,to_proto, andfile_formats.rs.10. Test and benchmark updates
reorder_filtersremoved from tests and benchmarks.filter_pushdown_min_bytes_per_sec = 0.0to preserve deterministic behavior (all filters always pushed down).DynamicFilter [...]toOptional(DynamicFilter [...]).selectivity.rscovering: effectiveness calculation, Welford's algorithm, confidence intervals, state machine transitions (initial placement, promotion, demotion, dropping), dynamic filter generation tracking, filter ordering, and integration lifecycle tests.explain_analyze.rs(output_rows=8->output_rows=5) due to the adaptive system now placing some filters as post-scan that were previously row-level, causing slight row count differences in EXPLAIN ANALYZE output.Are these changes tested?
Yes:
pushdown_filtersand filter pushdown SLT tests pass (withfilter_pushdown_min_bytes_per_sec = 0.0to force all filters to row-level for deterministic behavior).selectivity.rs(~450 lines of tests) covering theSelectivityStatscalculator,TrackerConfigbuilder, state machine transitions (initial placement, promotion, demotion, dropping, reset on generation change), filter ordering, and full promotion/demotion lifecycle integration tests.Optional(...)wrapper on dynamic filters.dynamic_filter_pushdown_config.slt,information_schema.slt,preserve_file_partitioning.slt,projection_pushdown.slt,push_down_filter.slt, andrepartition_subset_satisfaction.sltupdated.benchmarks/results.txtshows TPC-H (13 faster, 6 slower, 3 unchanged), TPC-DS (33 faster, 31 slower, 35 unchanged, with notable 24x improvement on Q64), and ClickBench (18 faster, 12 slower, 13 unchanged) results.Are there any user-facing changes?
Yes:
reorder_filtersconfig option removed. This is a breaking change. Users who setSET datafusion.execution.parquet.reorder_filters = truewill get an error. The adaptive system replaces this functionality automatically.Three new config options added under
datafusion.execution.parquet:filter_pushdown_min_bytes_per_sec(default: 52428800)filter_collecting_byte_ratio_threshold(default: 0.15)filter_confidence_z(default: 2.0)Changed default behavior when
pushdown_filters = true. Previously, all filters were unconditionally pushed into the Parquet reader. Now, the adaptive system decides per-filter based on byte-ratio thresholds and runtime effectiveness measurements. To restore the old behavior of pushing all filters unconditionally, setfilter_pushdown_min_bytes_per_sec = 0.0.EXPLAIN plan output changes. Dynamic join filters now display as
Optional(DynamicFilter [...])instead ofDynamicFilter [...], reflecting their new optional wrapper.Deprecated
predicate()method signature changed.ParquetSource::predicate()now returnsOption<Arc<dyn PhysicalExpr>>(owned) instead ofOption<&Arc<dyn PhysicalExpr>>(reference). This method was already deprecated in favor offilter().