Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Comet supports using the following aggregate functions within window contexts wi
| ArrayAppend | Yes | |
| ArrayCompact | No | |
| ArrayContains | Yes | |
| ArrayDistinct | No | Behaves differently than spark. Comet first sorts then removes duplicates while Spark preserves the original order. |
| ArrayDistinct | Yes | |
| ArrayExcept | No | |
| ArrayFilter | Yes | Only supports case where function is `IsNotNull` |
| ArrayInsert | No | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[ArrayAppend] -> CometArrayAppend,
classOf[ArrayCompact] -> CometArrayCompact,
classOf[ArrayContains] -> CometArrayContains,
classOf[ArrayDistinct] -> CometArrayDistinct,
classOf[ArrayDistinct] -> CometScalarFunction("array_distinct"),
classOf[ArrayExcept] -> CometArrayExcept,
classOf[ArrayFilter] -> CometArrayFilter,
classOf[ArrayInsert] -> CometArrayInsert,
Expand Down
19 changes: 1 addition & 18 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray}
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray}
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -120,23 +120,6 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] {
}
}

object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] {

override def getSupportLevel(expr: ArrayDistinct): SupportLevel =
Incompatible(Some("Output elements are sorted rather than preserving insertion order"))

override def convert(
expr: ArrayDistinct,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val arrayExprProto = exprToProto(expr.children.head, inputs, binding)

val arrayDistinctScalarExpr =
scalarFunctionExprToProto("array_distinct", arrayExprProto)
optExprWithInfo(arrayDistinctScalarExpr, expr)
}
}

object CometSortArray extends CometExpressionSerde[SortArray] {
private def containsFloatingPoint(dt: DataType): Boolean = {
dt match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
-- specific language governing permissions and limitations
-- under the License.

-- ConfigMatrix: parquet.enable.dictionary=false,true

-- ===== INT arrays =====

statement
Expand All @@ -34,23 +32,23 @@ INSERT INTO test_array_distinct_int VALUES
(array(0, -1, -1, 0, 1))

-- column argument
query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_int

-- literal arguments
query spark_answer_only
query
SELECT array_distinct(array(1, 2, 2, 3, 3))

-- all NULLs
query spark_answer_only
query
SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT)))

-- NULL input
query spark_answer_only
query
SELECT array_distinct(CAST(NULL AS array<int>))

-- boundary values
query spark_answer_only
query
SELECT array_distinct(array(-2147483648, 2147483647, -2147483648, 2147483647, 0))

-- ===== LONG arrays =====
Expand All @@ -65,11 +63,11 @@ INSERT INTO test_array_distinct_long VALUES
(array(NULL, 1, NULL, 2)),
(array(-9223372036854775808, 9223372036854775807, -9223372036854775808))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_long

-- boundary values
query spark_answer_only
query
SELECT array_distinct(array(CAST(-9223372036854775808 AS BIGINT), CAST(9223372036854775807 AS BIGINT), CAST(-9223372036854775808 AS BIGINT)))

-- ===== STRING arrays =====
Expand All @@ -86,11 +84,11 @@ INSERT INTO test_array_distinct_string VALUES
(array('', '', NULL, '')),
(array('hello', 'world', 'hello'))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_string

-- empty string and NULL distinction
query spark_answer_only
query
SELECT array_distinct(array('', NULL, '', NULL, 'a'))

-- ===== BOOLEAN arrays =====
Expand All @@ -105,7 +103,7 @@ INSERT INTO test_array_distinct_bool VALUES
(NULL),
(array(NULL, true, NULL, false))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_bool

-- ===== DOUBLE arrays =====
Expand All @@ -119,23 +117,23 @@ INSERT INTO test_array_distinct_double VALUES
(NULL),
(array(NULL, 1.0, NULL, 2.0))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_double

-- NaN deduplication
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS DOUBLE), CAST('NaN' AS DOUBLE), 1.0, 1.0))

-- NaN with NULL
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS DOUBLE), NULL, CAST('NaN' AS DOUBLE), NULL, 1.0))

-- Infinity
query spark_answer_only
query
SELECT array_distinct(array(CAST('Infinity' AS DOUBLE), CAST('-Infinity' AS DOUBLE), CAST('Infinity' AS DOUBLE), 0.0))

-- negative zero
query spark_answer_only
query
SELECT array_distinct(array(0.0, -0.0, 1.0))

-- ===== FLOAT arrays =====
Expand All @@ -149,11 +147,11 @@ INSERT INTO test_array_distinct_float VALUES
(NULL),
(array(CAST(NULL AS FLOAT), CAST(1.0 AS FLOAT), CAST(NULL AS FLOAT)))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_float

-- Float NaN deduplication
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS FLOAT), CAST('NaN' AS FLOAT), CAST(1.0 AS FLOAT)))

-- ===== DECIMAL arrays =====
Expand All @@ -167,13 +165,13 @@ INSERT INTO test_array_distinct_decimal VALUES
(NULL),
(array(NULL, 1.10, NULL, 1.10))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_decimal

-- ===== Nested array (array of arrays) =====

query spark_answer_only
query
SELECT array_distinct(array(array(1, 2), array(3, 4), array(1, 2), array(3, 4)))

query spark_answer_only
query
SELECT array_distinct(array(array(1, 2), CAST(NULL AS array<int>), array(1, 2), CAST(NULL AS array<int>)))
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion}
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion}
import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -403,29 +403,23 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_distinct") {
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayDistinct]) -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
// The result needs to be in ascending order for checkSparkAnswerAndOperator to pass
// because datafusion array_distinct sorts the elements and then removes the duplicates
checkSparkAnswerAndOperator(
spark.sql("SELECT array_distinct(array(_2, _2, _3, _4, _4)) FROM t1"))
checkSparkAnswerAndOperator(
spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1"))
checkSparkAnswerAndOperator(spark.sql(
"SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, _4, _5) END)) FROM t1"))
// NULL needs to be the first element for checkSparkAnswerAndOperator to pass because
// datafusion array_distinct sorts the elements and then removes the duplicates
checkSparkAnswerAndOperator(
spark.sql(
"SELECT array_distinct(array(CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1"))
checkSparkAnswerAndOperator(spark.sql(
"SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _2, _3, _4, _4)) FROM t1"))
}
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
checkSparkAnswerAndOperator(
spark.sql("SELECT array_distinct(array(_3, _2, _4, _2, _4)) FROM t1"))
checkSparkAnswerAndOperator(
spark.sql("SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1"))
checkSparkAnswerAndOperator(spark.sql(
"SELECT array_distinct((CASE WHEN _2 =_3 THEN array(_2, _2, _4, _4, _5) END)) FROM t1"))
checkSparkAnswerAndOperator(
spark.sql(
"SELECT array_distinct(array(_2, _2, CAST(NULL AS INT), _3, _4, _4)) FROM t1"))
checkSparkAnswerAndOperator(spark.sql(
"SELECT array_distinct(array(_2, _2, CAST(NULL AS INT), CAST(NULL AS INT), _3, _4, _4)) FROM t1"))
}
}
}
Expand Down
Loading