feat: initialize TopK dynamic filter threshold from parquet statistics#21712
feat: initialize TopK dynamic filter threshold from parquet statistics#21712zhuqi-lucas wants to merge 2 commits intoapache:mainfrom
Conversation
97f1343 to
22d97c9
Compare
… statistics Before reading any parquet data, scan row group min/max statistics to compute an initial threshold for TopK's dynamic filter. This allows row-level filtering to benefit immediately from the first file opened, rather than waiting until TopK processes enough rows to build a threshold organically. Algorithm (single-column sort): - DESC LIMIT K: threshold = max(min) across RGs with num_rows >= K Filter: col > threshold - ASC LIMIT K: threshold = min(max) across RGs with num_rows >= K Filter: col < threshold The DynamicFilterPhysicalExpr is shared across all partitions, so each file's threshold update is visible to subsequent files globally. Graceful fallback: skips initialization when statistics are unavailable, column is not found, or sort is multi-column.
Before reading any parquet data, scan row group min/max statistics to compute an initial threshold for TopK's dynamic filter. This allows row-level filtering to benefit immediately from the first file opened, rather than waiting until TopK processes enough rows to build a threshold organically. Algorithm (single-column sort): - DESC LIMIT K: threshold = max(min) across RGs with num_rows >= K Filter: col > threshold - ASC LIMIT K: threshold = min(max) across RGs with num_rows >= K Filter: col < threshold Sort direction is read from sort_options on DynamicFilterPhysicalExpr, which is now set by SortExec::create_filter() for TopK queries. This makes the optimization work for ALL TopK queries on parquet, not just those with sort pushdown. The DynamicFilterPhysicalExpr is shared across all partitions, so each file's threshold update is visible to subsequent files globally. Graceful fallback: skips initialization when sort_options is absent, statistics are unavailable, column not found, or multi-column sort.
22d97c9 to
fbbaf61
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-stats-init (fbbaf61) to 29f1acd (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-stats-init (fbbaf61) to 29f1acd (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-stats-init (fbbaf61) to 29f1acd (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-stats-init (fbbaf61) to 29f1acd (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Closes #21691
Rationale for this change
TopK's dynamic filter starts as
lit(true)(no filtering) and only tightens after processing enough rows to fill the heap. This means the first few row groups are never pruned by the dynamic filter. For queries likeORDER BY col DESC LIMIT 100on parquet files with good statistics, we can compute a tight initial threshold before reading any data.What changes are included in this PR?
Core implementation (
datafusion/datasource-parquet/src/opener.rs):Three new functions:
try_init_topk_threshold()— main function that finds the dynamic filter in the predicate, scans RG statistics, computes the best threshold, and callsupdate()on theDynamicFilterPhysicalExprfind_dynamic_filter()— recursively walks the predicate tree to find aDynamicFilterPhysicalExprcompute_best_threshold_from_stats()— iterates over row groups and finds the optimal thresholdAlgorithm (single-column sort):
threshold = max(min)across RGs wherenum_rows >= K, filter:col > thresholdthreshold = min(max)across RGs wherenum_rows >= K, filter:col < thresholdCalled in
build_stream()after metadata is loaded but before row filter construction, so the updated threshold is used by the row filter for the current file and by RG pruning for subsequent files.The
DynamicFilterPhysicalExpris shared across all partitions, so each file's threshold update is globally visible.Graceful fallback: skips initialization when statistics are unavailable, column not found, multi-column sort, or no qualifying RGs.
Are these changes tested?
13 unit tests in
opener.rs:compute_best_threshold_from_stats: 6 tests (DESC/ASC, skip small RGs, skip nulls, all too small, empty)find_dynamic_filter: 3 tests (direct match, nested in conjunction, absent)try_init_topk_threshold: 4 tests (DESC, ASC, no dynamic filter, multi-column skip)SLT tests in
sort_pushdown.slt(Test H series):Are there any user-facing changes?
No. This is a transparent optimization — same results, potentially faster TopK queries on parquet data with statistics.
Future work
DynamicFilterPhysicalExprto support all TopK queries, not just the Inexact sort pushdown path