Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ use datafusion_comet_proto::{
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct,
DecimalRescaleCheckOverflow, GetArrayStructFields, GetStructField, IfExpr, ListExtract,
NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance,
Mode, NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance,
WideDecimalBinaryExpr, WideDecimalOp,
};
use itertools::Itertools;
Expand Down Expand Up @@ -2011,6 +2011,14 @@ impl PhysicalPlanner {
.build()
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
}
AggExprStruct::Mode(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());

let func = AggregateUDF::new_from_impl(Mode::new("mode", datatype));

Self::create_aggr_func_expr("mode", schema, vec![child], func)
}
AggExprStruct::Sum(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Expand Down
6 changes: 6 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ message AggExpr {
Stddev stddev = 14;
Correlation correlation = 15;
BloomFilterAgg bloomFilterAgg = 16;
Mode mode = 17;
}

// Optional filter expression for SQL FILTER (WHERE ...) clause.
Expand Down Expand Up @@ -248,6 +249,11 @@ message BloomFilterAgg {
DataType datatype = 4;
}

message Mode {
Expr child = 1;
DataType datatype = 2;
}

enum EvalMode {
LEGACY = 0;
TRY = 1;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/agg_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod avg;
mod avg_decimal;
mod correlation;
mod covariance;
mod mode;
mod stddev;
mod sum_decimal;
mod sum_int;
Expand All @@ -28,6 +29,7 @@ pub use avg::Avg;
pub use avg_decimal::AvgDecimal;
pub use correlation::Correlation;
pub use covariance::Covariance;
pub use mode::Mode;
pub use stddev::Stddev;
pub use sum_decimal::SumDecimal;
pub use sum_int::SumInteger;
Expand Down
Loading
Loading