Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,17 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
///
/// Deprecated: this option is no longer used. The sort pipeline
/// now always coalesces batches before sorting. Use
/// `sort_coalesce_target_rows` instead.
pub sort_in_place_threshold_bytes: usize, warn = "`sort_in_place_threshold_bytes` is deprecated and ignored. Use `sort_coalesce_target_rows` instead.", default = 1024 * 1024

/// Target number of rows to coalesce before sorting in ExternalSorter.
///
/// Larger values reduce merge fan-in by producing fewer, larger
/// sorted runs.
pub sort_coalesce_target_rows: usize, default = 32768
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make this somewhat adaptive: as we usually load everything in memory, it seems for very large sets larger batches would be even more favorable (e.g. use 10MiB "scratch space" for coalescing instead of 32KiB rows would make sense if our data is 1GiB and perhaps be even faster?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I suspect we'd want to do a good sensitivity analysis on different types and batch sizes for lexsort_to_indices (and eventually the radix sort kernel). We might hit a point of diminishing returns/cache friendliness if our coalesced batches get too large.

This design also first spills from the sorted runs, so holding more unsorted rows in the coalescer may make it more likely for us to trigger spilling.

I'm definitely of the mind that we can and should tune this, but unclear what even a reasonable default right now would be. In Comet where we run TPC-H SF 1000, for example, I suspect we'll want longer sorted runs.


/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_sort_10k_mem() {
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
[(5, false), (10000, false), (50000, true), (1000000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
Expand Down
19 changes: 2 additions & 17 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl SortFuzzerTestGenerator {
pub fn generate_random_query(&self, rng_seed: u64) -> (String, Option<usize>) {
let mut rng = StdRng::seed_from_u64(rng_seed);

// Pick 1-3 ORDER BY columns.
let num_columns = rng.random_range(1..=3).min(self.selected_columns.len());
let selected_columns: Vec<_> = self
.selected_columns
Expand Down Expand Up @@ -497,17 +498,6 @@ impl SortFuzzerTestGenerator {
..=(per_partition_mem_limit as f64 * 0.3) as usize,
);

// 1 to 3 times of the approx batch size. Setting this to a very large nvalue
// will cause external sort to fail.
let sort_in_place_threshold_bytes = if with_memory_limit {
// For memory-limited query, setting `sort_in_place_threshold_bytes` too
// large will cause failure.
0
} else {
let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size;
rng.random_range(0..=dataset_size * 2_usize)
};

// Set up strings for printing
let memory_limit_str = if with_memory_limit {
human_readable_size(memory_limit)
Expand All @@ -530,16 +520,11 @@ impl SortFuzzerTestGenerator {
" Sort spill reservation bytes: {}",
human_readable_size(sort_spill_reservation_bytes)
);
println!(
" Sort in place threshold bytes: {}",
human_readable_size(sort_in_place_threshold_bytes)
);

let config = SessionConfig::new()
.with_target_partitions(num_partitions)
.with_batch_size(init_state.approx_batch_num_rows / 2)
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes)
.with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes);
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);

let memory_pool: Arc<dyn MemoryPool> = if with_memory_limit {
Arc::new(FairSpillPool::new(memory_limit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ async fn test_sort_with_limited_memory() -> Result<()> {
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
// The chunked sort pipeline is more memory-efficient (shrinks
// reservations after sorting), so total spill size may be less than
// pool size. Just verify that spilling occurred.
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
spill_count > 0,
"Expected spilling under memory pressure, but spill_count was 0",
);

Ok(())
Expand Down
24 changes: 7 additions & 17 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ async fn sort_spill_reservation() {
let scenario = Scenario::new_dictionary_strings(1);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
.with_sort_in_place_threshold_bytes(10);
let base_config = SessionConfig::new();

// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
Expand Down Expand Up @@ -313,26 +311,19 @@ async fn sort_spill_reservation() {
]
);

let config = base_config
.clone()
// provide insufficient reserved space for merging,
// the sort will fail while trying to merge
.with_sort_spill_reservation_bytes(1024);
// With low reservation, the sort should still succeed because
// the chunked sort pipeline eagerly sorts and the multi-level merge
// handles low merge memory by reducing fan-in.
let config = base_config.clone().with_sort_spill_reservation_bytes(1024);

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_expected_success()
.with_config(config)
.run()
.await;

let config = base_config
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
// reserve sufficient space up front for merge
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
Expand Down Expand Up @@ -583,7 +574,6 @@ async fn setup_context(

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(spill_compression)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
async fn test_max_temp_directory_size_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
ctx.sql("SET datafusion.runtime.memory_limit = '256K'")
.await
.unwrap()
.collect()
Expand Down
23 changes: 23 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,15 @@ impl SessionConfig {
/// Set the size of [`sort_in_place_threshold_bytes`] to control
/// how sort does things.
///
/// Deprecated: this option is no longer used. Use
/// [`with_sort_coalesce_target_rows`] instead.
///
/// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
/// [`with_sort_coalesce_target_rows`]: Self::with_sort_coalesce_target_rows
#[deprecated(
since = "46.0.0",
note = "No longer used. Sort pipeline now coalesces batches before sorting. Use with_sort_coalesce_target_rows instead."
)]
pub fn with_sort_in_place_threshold_bytes(
mut self,
sort_in_place_threshold_bytes: usize,
Expand All @@ -465,6 +473,21 @@ impl SessionConfig {
self
}

/// Set the target number of rows to coalesce before sorting.
///
/// Larger values reduce merge fan-in by producing fewer, larger
/// sorted runs.
///
/// [`sort_coalesce_target_rows`]: datafusion_common::config::ExecutionOptions::sort_coalesce_target_rows
pub fn with_sort_coalesce_target_rows(
mut self,
sort_coalesce_target_rows: usize,
) -> Self {
self.options_mut().execution.sort_coalesce_target_rows =
sort_coalesce_target_rows;
self
}

/// Enables or disables the enforcement of batch size in joins
pub fn with_enforce_batch_size_in_joins(
mut self,
Expand Down
Loading
Loading