You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We now have several complementary optimizations for TopK queries on parquet:
Dynamic work scheduling (Dynamic work scheduling in FileStream #21351, merged) — sibling FileStream partitions share a work queue and steal files from each other, ensuring no CPU sits idle.
When combining #21351 + #21580 (tested in #21731), the RG reorder shows minimal additional improvement over dynamic scheduling alone. This is because with multiple partitions reading files in parallel, one partition quickly finds good values "by luck" and updates the shared dynamic filter — making the precise intra-file RG ordering less impactful.
Problem
The shared work queue in #21351 (SharedWorkSource) uses the original file order — files are placed in the queue in whatever order they appear in file_groups. This means:
The first file picked by the first partition may have a poor value range
Threshold convergence depends on which partition happens to read a good file first
With many files of varying ranges, the "unlucky first pick" wastes parallel capacity
Proposed optimization
Sort files in the shared work queue by column statistics before any reading begins. For ORDER BY col DESC LIMIT K: put the file with the highest min value first. For ASC: lowest max first.
This ensures:
The very first file read is the globally optimal one — tight threshold from the first RG
All other partitions immediately benefit from the shared dynamic filter
Subsequent files are already ordered by quality — if the first file's threshold prunes most RGs, the second-best file is next in line
Full optimization chain
Global file reorder (best file first in shared queue)
→ TopK stats init (threshold from RG stats before I/O)
→ RG reorder within file (best RG first)
→ Dynamic scheduling (idle partitions steal work)
→ Dynamic filter pruning (skip RGs/files below threshold)
Each layer builds on the previous: global file ordering ensures the optimal starting point, stats init avoids wasting I/O on the first file, RG reorder optimizes within-file order, dynamic scheduling keeps all CPUs busy, and the dynamic filter propagates the threshold globally.
Implementation sketch
In SharedWorkSource::from_config() (or a new constructor):
Get the sort column and direction from the FileScanConfig's output ordering or from the DynamicFilterPhysicalExpr's sort_options
For files that have PartitionedFile.statistics with min/max for the sort column:
DESC: sort files by column_statistics[sort_col].min_value descending (highest min first)
ASC: sort files by column_statistics[sort_col].max_value ascending (lowest max first)
Files without statistics go to the end of the queue
When preserve_order is true, skip reordering (correctness requirement)
Expected impact
Small data (SF=1): moderate improvement — fewer files to iterate before finding optimal threshold
Large data (SF=10/100): significant improvement — with hundreds of files, the difference between reading the best file first vs. a random file first determines whether 90% of subsequent files are pruned immediately or after several files
Background
We now have several complementary optimizations for TopK queries on parquet:
FileStreampartitions share a work queue and steal files from each other, ensuring no CPU sits idle.When combining #21351 + #21580 (tested in #21731), the RG reorder shows minimal additional improvement over dynamic scheduling alone. This is because with multiple partitions reading files in parallel, one partition quickly finds good values "by luck" and updates the shared dynamic filter — making the precise intra-file RG ordering less impactful.
Problem
The shared work queue in #21351 (
SharedWorkSource) uses the original file order — files are placed in the queue in whatever order they appear infile_groups. This means:Proposed optimization
Sort files in the shared work queue by column statistics before any reading begins. For
ORDER BY col DESC LIMIT K: put the file with the highest min value first. For ASC: lowest max first.This ensures:
Full optimization chain
Each layer builds on the previous: global file ordering ensures the optimal starting point, stats init avoids wasting I/O on the first file, RG reorder optimizes within-file order, dynamic scheduling keeps all CPUs busy, and the dynamic filter propagates the threshold globally.
Implementation sketch
In
SharedWorkSource::from_config()(or a new constructor):FileScanConfig's output ordering or from theDynamicFilterPhysicalExpr'ssort_optionsPartitionedFile.statisticswith min/max for the sort column:column_statistics[sort_col].min_valuedescending (highest min first)column_statistics[sort_col].max_valueascending (lowest max first)preserve_orderis true, skip reordering (correctness requirement)Expected impact
Related