Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e1e1d09
Bring over apache/arrow-rs/9683, integrate into sorts, add heuristic …
mbutrovich Apr 9, 2026
9371dd1
Merge branch 'main' into arrow_rs_9683
mbutrovich Apr 13, 2026
fc35b08
Stash with implementation, need to fix accounting for one test.
mbutrovich Apr 13, 2026
d822a50
Fix more tests.
mbutrovich Apr 13, 2026
54be475
Tests pass.
mbutrovich Apr 13, 2026
a42ba49
Cleanup.
mbutrovich Apr 13, 2026
24dbdb7
More cleanup.
mbutrovich Apr 13, 2026
e83ffc6
More tests.
mbutrovich Apr 13, 2026
60fbdfb
More tests.
mbutrovich Apr 13, 2026
8e8f774
Cleanup before pushing.
mbutrovich Apr 13, 2026
aa93a6e
Fix CI failures.
mbutrovich Apr 13, 2026
af514fe
Fix configs.md.
mbutrovich Apr 13, 2026
c527553
Fix CI failures.
mbutrovich Apr 13, 2026
83860c0
Avoid radix sort for decimal types for now.
mbutrovich Apr 14, 2026
a88eacf
Fix information_schema.slt test.
mbutrovich Apr 14, 2026
175dcf6
Update to latest radix kernel from arrow-rs PR.
mbutrovich Apr 14, 2026
4bdf871
Use lexsort for single columns, radix otherwise. Should help with Q11…
mbutrovich Apr 14, 2026
a1f0193
Address some PR feedback.
mbutrovich Apr 14, 2026
bbda50f
Address some PR feedback.
mbutrovich Apr 14, 2026
2698b85
Update failing test.
mbutrovich Apr 14, 2026
d5d3ef7
Update failing test for realsies.
mbutrovich Apr 14, 2026
e0fb0a8
## `datafusion/physical-plan/src/sorts/sort.rs`
mbutrovich Apr 14, 2026
73bb06b
Cleanup.
mbutrovich Apr 14, 2026
7a75c38
Fix copy.slt test to show new metrics.
mbutrovich Apr 14, 2026
482e72c
Temporarily default radix to false to get benchmarks.
mbutrovich Apr 14, 2026
96dd0df
Revert "Temporarily default radix to false to get benchmarks."
mbutrovich Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,26 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
///
/// Deprecated: this option is no longer used. The sort pipeline
/// now always coalesces batches before sorting. Use
/// `sort_coalesce_target_rows` instead.
pub sort_in_place_threshold_bytes: usize, warn = "`sort_in_place_threshold_bytes` is deprecated and ignored. Use `sort_coalesce_target_rows` instead.", default = 1024 * 1024

/// Target number of rows to coalesce before sorting in ExternalSorter.
///
/// Larger values reduce merge fan-in and give radix sort enough rows
/// to amortize RowConverter encoding overhead.
pub sort_coalesce_target_rows: usize, default = 32768

/// When true, use radix sort for multi-column sorts over eligible
/// types (primitives, strings). When false, always use lexsort.
///
/// Radix sort is 2-3x faster than lexsort at 32K+ rows for
/// multi-column sorts but has higher overhead for small batches.
/// Set to false to isolate the performance impact of the
/// coalesce-then-sort pipeline from radix sort itself.
pub sort_use_radix: bool, default = true

/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_sort_10k_mem() {
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
[(5, false), (10000, false), (50000, true), (1000000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
Expand Down
20 changes: 3 additions & 17 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ impl SortFuzzerTestGenerator {
pub fn generate_random_query(&self, rng_seed: u64) -> (String, Option<usize>) {
let mut rng = StdRng::seed_from_u64(rng_seed);

// Pick 1-3 ORDER BY columns. Single-column queries exercise the
// lexsort path; multi-column queries exercise the radix sort path.
let num_columns = rng.random_range(1..=3).min(self.selected_columns.len());
let selected_columns: Vec<_> = self
.selected_columns
Expand Down Expand Up @@ -497,17 +499,6 @@ impl SortFuzzerTestGenerator {
..=(per_partition_mem_limit as f64 * 0.3) as usize,
);

// 1 to 3 times of the approx batch size. Setting this to a very large nvalue
// will cause external sort to fail.
let sort_in_place_threshold_bytes = if with_memory_limit {
// For memory-limited query, setting `sort_in_place_threshold_bytes` too
// large will cause failure.
0
} else {
let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size;
rng.random_range(0..=dataset_size * 2_usize)
};

// Set up strings for printing
let memory_limit_str = if with_memory_limit {
human_readable_size(memory_limit)
Expand All @@ -530,16 +521,11 @@ impl SortFuzzerTestGenerator {
" Sort spill reservation bytes: {}",
human_readable_size(sort_spill_reservation_bytes)
);
println!(
" Sort in place threshold bytes: {}",
human_readable_size(sort_in_place_threshold_bytes)
);

let config = SessionConfig::new()
.with_target_partitions(num_partitions)
.with_batch_size(init_state.approx_batch_num_rows / 2)
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes)
.with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes);
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);

let memory_pool: Arc<dyn MemoryPool> = if with_memory_limit {
Arc::new(FairSpillPool::new(memory_limit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ async fn test_sort_with_limited_memory() -> Result<()> {
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
// The chunked sort pipeline is more memory-efficient (shrinks
// reservations after sorting), so total spill size may be less than
// pool size. Just verify that spilling occurred.
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
spill_count > 0,
"Expected spilling under memory pressure, but spill_count was 0",
);

Ok(())
Expand Down
24 changes: 7 additions & 17 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ async fn sort_spill_reservation() {
let scenario = Scenario::new_dictionary_strings(1);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
.with_sort_in_place_threshold_bytes(10);
let base_config = SessionConfig::new();

// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
Expand Down Expand Up @@ -313,26 +311,19 @@ async fn sort_spill_reservation() {
]
);

let config = base_config
.clone()
// provide insufficient reserved space for merging,
// the sort will fail while trying to merge
.with_sort_spill_reservation_bytes(1024);
// With low reservation, the sort should still succeed because
// the chunked sort pipeline eagerly sorts and the multi-level merge
// handles low merge memory by reducing fan-in.
let config = base_config.clone().with_sort_spill_reservation_bytes(1024);

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_expected_success()
.with_config(config)
.run()
.await;

let config = base_config
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
// reserve sufficient space up front for merge
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
Expand Down Expand Up @@ -583,7 +574,6 @@ async fn setup_context(

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(spill_compression)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
async fn test_max_temp_directory_size_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
ctx.sql("SET datafusion.runtime.memory_limit = '256K'")
.await
.unwrap()
.collect()
Expand Down
34 changes: 34 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,15 @@ impl SessionConfig {
/// Set the size of [`sort_in_place_threshold_bytes`] to control
/// how sort does things.
///
/// Deprecated: this option is no longer used. Use
/// [`with_sort_coalesce_target_rows`] instead.
///
/// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
/// [`with_sort_coalesce_target_rows`]: Self::with_sort_coalesce_target_rows
#[deprecated(
since = "46.0.0",
note = "No longer used. Sort pipeline now coalesces batches before sorting. Use with_sort_coalesce_target_rows instead."
)]
pub fn with_sort_in_place_threshold_bytes(
mut self,
sort_in_place_threshold_bytes: usize,
Expand All @@ -465,6 +473,32 @@ impl SessionConfig {
self
}

/// Set the target number of rows to coalesce before sorting.
///
/// Larger values give radix sort enough rows to amortize encoding
/// overhead (2-3x faster at 32K+ rows). Under memory pressure the
/// actual chunk size may be smaller.
///
/// [`sort_coalesce_target_rows`]: datafusion_common::config::ExecutionOptions::sort_coalesce_target_rows
pub fn with_sort_coalesce_target_rows(
mut self,
sort_coalesce_target_rows: usize,
) -> Self {
self.options_mut().execution.sort_coalesce_target_rows =
sort_coalesce_target_rows;
self
}

/// Enables or disables radix sort for multi-column sorts over eligible types.
///
/// When false, always uses lexsort regardless of schema eligibility.
///
/// [`sort_use_radix`]: datafusion_common::config::ExecutionOptions::sort_use_radix
pub fn with_sort_use_radix(mut self, sort_use_radix: bool) -> Self {
self.options_mut().execution.sort_use_radix = sort_use_radix;
self
}

/// Enables or disables the enforcement of batch size in joins
pub fn with_enforce_batch_size_in_joins(
mut self,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ impl GroupedHashAggregateStream {
emit,
self.spill_state.spill_expr.clone(),
self.batch_size,
false,
);
let spillfile = self
.spill_state
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod cursor;
mod merge;
mod multi_level_merge;
pub mod partial_sort;
mod radix;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
Expand Down
Loading
Loading