Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::jni_api::get_runtime;
use crate::execution::operators::ExecutionError;
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory;
Expand All @@ -23,6 +24,7 @@ use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use arrow::datatypes::{Field, SchemaRef};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
};
Expand All @@ -37,6 +39,9 @@ use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_datasource::TableSchema;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::RowGroupMetaData;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -153,7 +158,15 @@ pub(crate) fn init_datasource_exec(
// Use caching reader factory to avoid redundant footer reads across partitions
let store = session_ctx.runtime_env().object_store(&object_store_url)?;
parquet_source = parquet_source
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store)));
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(
Arc::clone(&store),
)));

// Apply midpoint-based row group pruning to match Spark/parquet-mr behavior.
// DataFusion's built-in prune_by_range uses start-offset which can disagree with
// Spark's midpoint-based assignment, causing some tasks to read no data while
// others read too much. We replace the range with an explicit ParquetAccessPlan.
let file_groups = apply_midpoint_row_group_pruning(file_groups, &store)?;

let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values),
Expand Down Expand Up @@ -182,6 +195,78 @@ pub(crate) fn init_datasource_exec(
Ok(data_source_exec)
}

/// Compute the midpoint offset of a row group, matching the algorithm used by
/// Spark (parquet-mr) and parquet-rs to assign row groups to file splits.
///
/// The midpoint is: min(data_page_offset, dictionary_page_offset) + compressed_size / 2
///
/// A row group belongs to a split if its midpoint falls within [split_start, split_end).
fn get_row_group_midpoint(rg: &RowGroupMetaData) -> i64 {
let col = rg.column(0);
let mut offset = col.data_page_offset();
if let Some(dict_offset) = col.dictionary_page_offset() {
if dict_offset < offset {
offset = dict_offset;
}
}
offset + rg.compressed_size() / 2
}

/// For each PartitionedFile that has a byte range, read the Parquet footer and compute
/// which row groups belong to this split using the midpoint algorithm (matching Spark/parquet-mr).
/// Replace the byte range with an explicit ParquetAccessPlan so that DataFusion's
/// `prune_by_range` (which uses a different algorithm) is bypassed.
fn apply_midpoint_row_group_pruning(
file_groups: Vec<Vec<PartitionedFile>>,
store: &Arc<dyn ObjectStore>,
) -> Result<Vec<Vec<PartitionedFile>>, ExecutionError> {
let has_ranges = file_groups
.iter()
.any(|group| group.iter().any(|f| f.range.is_some()));
if !has_ranges {
return Ok(file_groups);
}

let rt = get_runtime();

let mut result = Vec::with_capacity(file_groups.len());
for group in file_groups {
let mut new_group = Vec::with_capacity(group.len());
for mut file in group {
if let Some(range) = file.range.take() {
let metadata = rt.block_on(async {
let mut reader =
ParquetObjectReader::new(Arc::clone(store), file.object_meta.location.clone())
.with_file_size(file.object_meta.size);
reader.get_metadata(None).await
})
.map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to read Parquet metadata for {}: {e}",
file.object_meta.location
))
})?;

let num_row_groups = metadata.num_row_groups();
let mut access_plan = ParquetAccessPlan::new_none(num_row_groups);

for i in 0..num_row_groups {
let midpoint = get_row_group_midpoint(metadata.row_group(i));
if midpoint >= range.start && midpoint < range.end {
access_plan.scan(i);
}
}

file.extensions = Some(Arc::new(access_plan));
}
new_group.push(file);
}
result.push(new_group);
}

Ok(result)
}

fn get_options(
session_timezone: &str,
case_sensitive: bool,
Expand Down
Loading