Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 136 additions & 82 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,61 +102,104 @@ const NUM_STREAMS: usize = 8;
/// The size of each batch within each stream
const BATCH_SIZE: usize = 1024;

/// Total number of input rows to generate
const INPUT_SIZE: u64 = 100000;
/// Input sizes to benchmark. The small size (100K) exercises the
/// in-memory concat-and-sort path; the large size (10M) exercises
/// the sort-then-merge path with high fan-in.
const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")];

type PartitionedBatches = Vec<Vec<RecordBatch>>;
type StreamGenerator = Box<dyn Fn(bool) -> PartitionedBatches>;

fn criterion_benchmark(c: &mut Criterion) {
let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![
("i64", &i64_streams),
("f64", &f64_streams),
("utf8 low cardinality", &utf8_low_cardinality_streams),
("utf8 high cardinality", &utf8_high_cardinality_streams),
(
"utf8 view low cardinality",
&utf8_view_low_cardinality_streams,
),
(
"utf8 view high cardinality",
&utf8_view_high_cardinality_streams,
),
("utf8 tuple", &utf8_tuple_streams),
("utf8 view tuple", &utf8_view_tuple_streams),
("utf8 dictionary", &dictionary_streams),
("utf8 dictionary tuple", &dictionary_tuple_streams),
("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
("mixed tuple", &mixed_tuple_streams),
(
"mixed tuple with utf8 view",
&mixed_tuple_with_utf8_view_streams,
),
];

for (name, f) in cases {
c.bench_function(&format!("merge sorted {name}"), |b| {
let data = f(true);
let case = BenchCase::merge_sorted(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort merge {name}"), |b| {
let data = f(false);
let case = BenchCase::sort_merge(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort {name}"), |b| {
let data = f(false);
let case = BenchCase::sort(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort partitioned {name}"), |b| {
let data = f(false);
let case = BenchCase::sort_partitioned(&data);
b.iter(move || case.run())
});
for &(input_size, size_label) in INPUT_SIZES {
let cases: Vec<(&str, StreamGenerator)> = vec![
(
"i64",
Box::new(move |sorted| i64_streams(sorted, input_size)),
),
(
"f64",
Box::new(move |sorted| f64_streams(sorted, input_size)),
),
(
"utf8 low cardinality",
Box::new(move |sorted| utf8_low_cardinality_streams(sorted, input_size)),
),
(
"utf8 high cardinality",
Box::new(move |sorted| utf8_high_cardinality_streams(sorted, input_size)),
),
(
"utf8 view low cardinality",
Box::new(move |sorted| {
utf8_view_low_cardinality_streams(sorted, input_size)
}),
),
(
"utf8 view high cardinality",
Box::new(move |sorted| {
utf8_view_high_cardinality_streams(sorted, input_size)
}),
),
(
"utf8 tuple",
Box::new(move |sorted| utf8_tuple_streams(sorted, input_size)),
),
(
"utf8 view tuple",
Box::new(move |sorted| utf8_view_tuple_streams(sorted, input_size)),
),
(
"utf8 dictionary",
Box::new(move |sorted| dictionary_streams(sorted, input_size)),
),
(
"utf8 dictionary tuple",
Box::new(move |sorted| dictionary_tuple_streams(sorted, input_size)),
),
(
"mixed dictionary tuple",
Box::new(move |sorted| {
mixed_dictionary_tuple_streams(sorted, input_size)
}),
),
(
"mixed tuple",
Box::new(move |sorted| mixed_tuple_streams(sorted, input_size)),
),
(
"mixed tuple with utf8 view",
Box::new(move |sorted| {
mixed_tuple_with_utf8_view_streams(sorted, input_size)
}),
),
];

for (name, f) in &cases {
c.bench_function(&format!("merge sorted {name} {size_label}"), |b| {
let data = f(true);
let case = BenchCase::merge_sorted(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort merge {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort_merge(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort(&data);
b.iter(move || case.run())
});

c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort_partitioned(&data);
b.iter(move || case.run())
});
}
}
}

Expand Down Expand Up @@ -279,8 +322,8 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering {
}

/// Create streams of int64 (where approximately 1/3 values is repeated)
fn i64_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().i64_values();
fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).i64_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -293,8 +336,8 @@ fn i64_streams(sorted: bool) -> PartitionedBatches {

/// Create streams of f64 (where approximately 1/3 values are repeated)
/// with the same distribution as i64_streams
fn f64_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().f64_values();
fn f64_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).f64_values();
if sorted {
values.sort_unstable_by(|a, b| a.total_cmp(b));
}
Expand All @@ -306,8 +349,8 @@ fn f64_streams(sorted: bool) -> PartitionedBatches {
}

/// Create streams of random low cardinality utf8 values
fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_low_cardinality_values();
fn utf8_low_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -318,8 +361,11 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
}

/// Create streams of random low cardinality utf8_view values
fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_low_cardinality_values();
fn utf8_view_low_cardinality_streams(
sorted: bool,
input_size: u64,
) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -330,8 +376,11 @@ fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
}

/// Create streams of high cardinality (~ no duplicates) utf8_view values
fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_high_cardinality_values();
fn utf8_view_high_cardinality_streams(
sorted: bool,
input_size: u64,
) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -342,8 +391,8 @@ fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
}

/// Create streams of high cardinality (~ no duplicates) utf8 values
fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_high_cardinality_values();
fn utf8_high_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -354,8 +403,8 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (utf8_low, utf8_low, utf8_high)
fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn utf8_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = data_gen
Expand Down Expand Up @@ -387,8 +436,8 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn utf8_view_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = data_gen
Expand Down Expand Up @@ -420,8 +469,8 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (f64, utf8_low, utf8_low, i64)
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn mixed_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = data_gen
Expand Down Expand Up @@ -458,8 +507,11 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn mixed_tuple_with_utf8_view_streams(
sorted: bool,
input_size: u64,
) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = data_gen
Expand Down Expand Up @@ -496,8 +548,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (utf8_dict)
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn dictionary_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);
let mut values = data_gen.utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
Expand All @@ -511,8 +563,8 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
Expand Down Expand Up @@ -542,8 +594,8 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut data_gen = DataGenerator::new();
fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
Expand Down Expand Up @@ -579,19 +631,21 @@ fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
/// Encapsulates creating data for this test
struct DataGenerator {
rng: StdRng,
input_size: u64,
}

impl DataGenerator {
fn new() -> Self {
fn new(input_size: u64) -> Self {
Self {
rng: StdRng::seed_from_u64(42),
input_size,
}
}

/// Create an array of i64 sorted values (where approximately 1/3 values is repeated)
fn i64_values(&mut self) -> Vec<i64> {
let mut vec: Vec<_> = (0..INPUT_SIZE)
.map(|_| self.rng.random_range(0..INPUT_SIZE as i64))
let mut vec: Vec<_> = (0..self.input_size)
.map(|_| self.rng.random_range(0..self.input_size as i64))
.collect();

vec.sort_unstable();
Expand All @@ -614,7 +668,7 @@ impl DataGenerator {
.collect::<Vec<_>>();

// pick from the 100 strings randomly
let mut input = (0..INPUT_SIZE)
let mut input = (0..self.input_size)
.map(|_| {
let idx = self.rng.random_range(0..strings.len());
let s = Arc::clone(&strings[idx]);
Expand All @@ -629,7 +683,7 @@ impl DataGenerator {
/// Create sorted values of high cardinality (~ no duplicates) utf8 values
fn utf8_high_cardinality_values(&mut self) -> Vec<Option<String>> {
// make random strings
let mut input = (0..INPUT_SIZE)
let mut input = (0..self.input_size)
.map(|_| Some(self.random_string()))
.collect::<Vec<_>>();

Expand Down
Loading