From 76f118d3656e68eafde47957b09b862bc46e1468 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 17 Apr 2026 19:44:19 -0700 Subject: [PATCH] chor: enable array_distinct --- docs/source/user-guide/latest/expressions.md | 2 +- .../apache/comet/serde/QueryPlanSerde.scala | 2 +- .../scala/org/apache/comet/serde/arrays.scala | 19 +-------- .../expressions/array/array_distinct.sql | 42 +++++++++---------- .../comet/CometArrayExpressionSuite.scala | 42 ++++++++----------- 5 files changed, 41 insertions(+), 66 deletions(-) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index c4ab531814..11c16ffae0 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -241,7 +241,7 @@ Comet supports using the following aggregate functions within window contexts wi | ArrayAppend | Yes | | | ArrayCompact | No | | | ArrayContains | Yes | | -| ArrayDistinct | No | Behaves differently than spark. Comet first sorts then removes duplicates while Spark preserves the original order. | +| ArrayDistinct | Yes | | | ArrayExcept | No | | | ArrayFilter | Yes | Only supports case where function is `IsNotNull` | | ArrayInsert | No | | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 810d9bd7da..56c4437290 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -50,7 +50,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ArrayAppend] -> CometArrayAppend, classOf[ArrayCompact] -> CometArrayCompact, classOf[ArrayContains] -> CometArrayContains, - classOf[ArrayDistinct] -> CometArrayDistinct, + classOf[ArrayDistinct] -> CometScalarFunction("array_distinct"), classOf[ArrayExcept] -> CometArrayExcept, classOf[ArrayFilter] -> CometArrayFilter, classOf[ArrayInsert] -> CometArrayInsert, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index e05967958a..75cfe9373b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -120,23 +120,6 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] { } } -object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] { - - override def getSupportLevel(expr: ArrayDistinct): SupportLevel = - Incompatible(Some("Output elements are sorted rather than preserving insertion order")) - - override def convert( - expr: ArrayDistinct, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - val arrayExprProto = exprToProto(expr.children.head, inputs, binding) - - val arrayDistinctScalarExpr = - scalarFunctionExprToProto("array_distinct", arrayExprProto) - optExprWithInfo(arrayDistinctScalarExpr, expr) - } -} - object CometSortArray extends CometExpressionSerde[SortArray] { private def containsFloatingPoint(dt: DataType): Boolean = { dt match { diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_distinct.sql b/spark/src/test/resources/sql-tests/expressions/array/array_distinct.sql index f9d63df075..a46ff1642a 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_distinct.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_distinct.sql @@ -15,8 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- ConfigMatrix: parquet.enable.dictionary=false,true - -- ===== INT arrays ===== statement @@ -34,23 +32,23 @@ INSERT INTO test_array_distinct_int VALUES (array(0, -1, -1, 0, 1)) -- column argument -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_int -- literal arguments -query spark_answer_only +query SELECT array_distinct(array(1, 2, 2, 3, 3)) -- all NULLs -query spark_answer_only +query SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT))) -- NULL input -query spark_answer_only +query SELECT array_distinct(CAST(NULL AS array)) -- boundary values -query spark_answer_only +query SELECT array_distinct(array(-2147483648, 2147483647, -2147483648, 2147483647, 0)) -- ===== LONG arrays ===== @@ -65,11 +63,11 @@ INSERT INTO test_array_distinct_long VALUES (array(NULL, 1, NULL, 2)), (array(-9223372036854775808, 9223372036854775807, -9223372036854775808)) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_long -- boundary values -query spark_answer_only +query SELECT array_distinct(array(CAST(-9223372036854775808 AS BIGINT), CAST(9223372036854775807 AS BIGINT), CAST(-9223372036854775808 AS BIGINT))) -- ===== STRING arrays ===== @@ -86,11 +84,11 @@ INSERT INTO test_array_distinct_string VALUES (array('', '', NULL, '')), (array('hello', 'world', 'hello')) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_string -- empty string and NULL distinction -query spark_answer_only +query SELECT array_distinct(array('', NULL, '', NULL, 'a')) -- ===== BOOLEAN arrays ===== @@ -105,7 +103,7 @@ INSERT INTO test_array_distinct_bool VALUES (NULL), (array(NULL, true, NULL, false)) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_bool -- ===== DOUBLE arrays ===== @@ -119,23 +117,23 @@ INSERT INTO test_array_distinct_double VALUES (NULL), (array(NULL, 1.0, NULL, 2.0)) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_double -- NaN deduplication -query spark_answer_only +query SELECT array_distinct(array(CAST('NaN' AS DOUBLE), CAST('NaN' AS DOUBLE), 1.0, 1.0)) -- NaN with NULL -query spark_answer_only +query SELECT array_distinct(array(CAST('NaN' AS DOUBLE), NULL, CAST('NaN' AS DOUBLE), NULL, 1.0)) -- Infinity -query spark_answer_only +query SELECT array_distinct(array(CAST('Infinity' AS DOUBLE), CAST('-Infinity' AS DOUBLE), CAST('Infinity' AS DOUBLE), 0.0)) -- negative zero -query spark_answer_only +query SELECT array_distinct(array(0.0, -0.0, 1.0)) -- ===== FLOAT arrays ===== @@ -149,11 +147,11 @@ INSERT INTO test_array_distinct_float VALUES (NULL), (array(CAST(NULL AS FLOAT), CAST(1.0 AS FLOAT), CAST(NULL AS FLOAT))) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_float -- Float NaN deduplication -query spark_answer_only +query SELECT array_distinct(array(CAST('NaN' AS FLOAT), CAST('NaN' AS FLOAT), CAST(1.0 AS FLOAT))) -- ===== DECIMAL arrays ===== @@ -167,13 +165,13 @@ INSERT INTO test_array_distinct_decimal VALUES (NULL), (array(NULL, 1.10, NULL, 1.10)) -query spark_answer_only +query SELECT array_distinct(arr) FROM test_array_distinct_decimal -- ===== Nested array (array of arrays) ===== -query spark_answer_only +query SELECT array_distinct(array(array(1, 2), array(3, 4), array(1, 2), array(3, 4))) -query spark_answer_only +query SELECT array_distinct(array(array(1, 2), CAST(NULL AS array), array(1, 2), CAST(NULL AS array))) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index bb519492db..322dc4c4ef 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -403,29 +403,23 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_distinct") { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayDistinct]) -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - // The result needs to be in ascending order for checkSparkAnswerAndOperator to pass - // because datafusion array_distinct sorts the elements and then removes the duplicates - checkSparkAnswerAndOperator( - spark.sql("SELECT array_distinct(array(_2, _2, _3, _4, _4)) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, _4, _5) END)) FROM t1")) - // NULL needs to be the first element for checkSparkAnswerAndOperator to pass because - // datafusion array_distinct sorts the elements and then removes the duplicates - checkSparkAnswerAndOperator( - spark.sql( - "SELECT array_distinct(array(CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1")) - } + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + spark.sql("SELECT array_distinct(array(_3, _2, _4, _2, _4)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, _4, _5) END)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql( + "SELECT array_distinct(array(_2, _2, CAST(NULL AS INT), _3, _4, _4)) FROM t1")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT array_distinct(array(_2, _2, CAST(NULL AS INT), CAST(NULL AS INT), _3, _4, _4)) FROM t1")) } } }