Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 73 additions & 39 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1154,84 +1154,118 @@ run_sort_pushdown_sorted() {

# Generates data for sort pushdown Inexact benchmark.
#
# Produces a single large lineitem parquet file where row groups have
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
# sorted, RGs shuffled). This simulates append-heavy workloads where data
# Produces multiple parquet files where each file has a narrow, non-overlapping
# l_orderkey range (internally sorted), but files appear in scrambled
# alphabetical order. This simulates append-heavy workloads where data
# is written in batches at different times.
#
# Strategy: write a single sorted file with small (100K-row) RGs, then
# use pyarrow to split into per-RG files with scrambled names.
# Writing a single file with ORDER BY scramble does NOT work: the parquet
# writer merges rows from adjacent chunks at RG boundaries, widening
# ranges and defeating reorder_by_statistics.
#
# Requires pyarrow for splitting (pip install pyarrow).
data_sort_pushdown_inexact() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
return
fi

echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
echo "Generating sort pushdown Inexact benchmark data (scrambled file order)..."

# Re-use the sort_pushdown data as the source (generate if missing)
data_sort_pushdown

mkdir -p "${INEXACT_DIR}"
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"

# Use datafusion-cli to bucket rows into 64 groups by a deterministic
# scrambler, then sort within each bucket by orderkey. This produces
# ~64 RG-sized segments where each has a tight orderkey range but the
# segments appear in scrambled (non-sorted) order in the file.
# Step 1: Write a single sorted file with small (100K-row) RGs
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

COPY (
SELECT * FROM src
ORDER BY
(l_orderkey * 1664525 + 1013904223) % 64,
l_orderkey
)
TO '${INEXACT_DIR}/shuffled.parquet'
COPY (SELECT * FROM src ORDER BY l_orderkey)
TO '${TMPFILE}'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
# Step 2: Split into per-RG files with scrambled names via pyarrow.
# Uses a different permutation than the overlap benchmark so the two
# datasets exercise different file orderings.
python3 -c "
import pyarrow.parquet as pq
pf = pq.ParquetFile('${TMPFILE}')
n = pf.metadata.num_row_groups
for rg_idx in range(n):
slot = (rg_idx * 41 + 7) % n
table = pf.read_row_group(rg_idx)
pq.write_table(table, '${INEXACT_DIR}/chunk_%03d.parquet' % slot)
print(f'Split {n} RGs into scrambled files')
"

rm -f "${TMPFILE}"
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
ls -la "${INEXACT_DIR}"

# Also generate a file with partially overlapping row groups.
# Simulates streaming data with network delays: each chunk is mostly
# in order but has a small overlap with the next chunk (±5% of the
# chunk range). This is the pattern described by @adriangb — data
# arriving with timestamps that are generally increasing but with
# network-induced jitter causing small overlaps between row groups.
# Also generate files with scrambled file order.
# Simulates streaming data with network delays: chunks arrive out
# of sequence. This is the pattern described by @adriangb — data
# arriving with timestamps that are generally increasing but
# network-induced delays cause chunks to arrive out of order.
#
# Strategy:
# 1. Write a sorted parquet file with small RGs (100K rows each),
# producing ~61 RGs with narrow, non-overlapping l_orderkey ranges.
# 2. Split into individual files (one per RG) and rename using a
# deterministic permutation so alphabetical file order !=
# l_orderkey order. reorder_by_statistics fixes this.
#
# Writing a single file with ORDER BY and jitter/scrambling does NOT
# work: the parquet writer merges rows from adjacent chunks into the
# same RG at chunk boundaries, widening RG ranges and defeating
# reorder. Splitting into separate files avoids this.
#
# Requires pyarrow for splitting (pip install pyarrow).
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
return
fi

echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
echo "Generating sort pushdown Inexact overlap data (scrambled file order)..."
mkdir -p "${OVERLAP_DIR}"

# Step 1: Write a single sorted file with small (100K-row) RGs
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

-- Add jitter to l_orderkey: shift each row by a random-ish offset
-- proportional to its position. This creates overlap between adjacent
-- row groups while preserving the general ascending trend.
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
COPY (
SELECT * FROM src
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
)
TO '${OVERLAP_DIR}/overlapping.parquet'
COPY (SELECT * FROM src ORDER BY l_orderkey)
TO '${TMPFILE}'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
ls -la "${OVERLAP_DIR}"
# Step 2: Split into per-RG files with scrambled names via pyarrow
python3 -c "
import pyarrow.parquet as pq
pf = pq.ParquetFile('${TMPFILE}')
n = pf.metadata.num_row_groups
for rg_idx in range(n):
slot = (rg_idx * 37 + 13) % n
table = pf.read_row_group(rg_idx)
pq.write_table(table, '${OVERLAP_DIR}/chunk_%03d.parquet' % slot)
print(f'Split {n} RGs into scrambled files')
"

rm -f "${TMPFILE}"
}

# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
Expand All @@ -1256,13 +1290,13 @@ run_sort_pushdown_inexact_unsorted() {
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort pushdown benchmark with partially overlapping RGs.
# Simulates streaming data with network jitterRGs are mostly in order
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
# Runs the sort pushdown benchmark with scrambled file order.
# Simulates streaming data with network delaysfiles have narrow ranges
# but appear in scrambled order. Tests reorder_by_statistics effectiveness.
run_sort_pushdown_inexact_overlap() {
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
echo "Running sort pushdown Inexact benchmark (scrambled file order, streaming data pattern)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}
Expand Down
Loading