Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/source/contributor-guide/adding_a_new_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,14 @@ For operators that run in the JVM:
Example pattern from `CometExecRule.scala`:

```scala
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None => s
}
```

## Common Patterns and Helpers
Expand Down
67 changes: 0 additions & 67 deletions spark/src/main/scala/org/apache/comet/CometFallback.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,29 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
* operators or expressions are disabled.
* Record a fallback reason on a `TreeNode` (a Spark operator or expression) explaining why
* Comet cannot accelerate it. Reasons recorded here are surfaced in extended explain output
* (see `ExtendedExplainInfo`) and, when `COMET_LOG_FALLBACK_REASONS` is enabled, logged as
* warnings. The reasons are also rolled up from child nodes so that the operator that remains
* in the Spark plan carries the reasons from its converted-away subtree.
*
* Call this in any code path where Comet decides not to convert a given node - serde `convert`
* methods returning `None`, unsupported data types, disabled configs, etc. Do not use this for
* informational messages that are not fallback reasons: anything tagged here is treated by the
* rules as a signal that the node falls back to Spark.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The Spark operator or expression that is falling back to Spark.
* @param info
* Information text. Optional, may be null or empty. If not provided, then only information
* from child nodes will be included.
* The fallback reason. Optional, may be null or empty - pass empty only when the call is used
* purely to roll up reasons from `exprs`.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose own fallback reasons should be rolled up into `node`. Pass the
* sub-expressions or child operators whose failure caused `node` to fall back.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with fallback reasons attached (as a side effect on its tag map).
*/
def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
// support existing approach of passing in multiple infos in a newline-delimited string
Expand All @@ -228,22 +235,24 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
* operators or expressions are disabled.
* Record one or more fallback reasons on a `TreeNode` and roll up reasons from any child nodes.
* This is the set-valued form of [[withInfo]]; see that overload for the full contract.
*
* Reasons are accumulated (never overwritten) on the node's `EXTENSION_INFO` tag and are
* surfaced in extended explain output. When `COMET_LOG_FALLBACK_REASONS` is enabled, each new
* reason is also emitted as a warning.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The Spark operator or expression that is falling back to Spark.
* @param info
* Information text. May contain zero or more strings. If not provided, then only information
* from child nodes will be included.
* The fallback reasons for this node. May be empty when the call is used purely to roll up
* child reasons.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose own fallback reasons should be rolled up into `node`.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with fallback reasons attached (as a side effect on its tag map).
*/
def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
if (CometConf.COMET_LOG_FALLBACK_REASONS.get()) {
Expand All @@ -259,25 +268,27 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes
* Roll up fallback reasons from `exprs` onto `node` without adding a new reason of its own. Use
* this when a parent operator is itself falling back and wants to preserve the reasons recorded
* on its child expressions/operators so they appear together in explain output.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The parent operator or expression falling back to Spark.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose fallback reasons should be aggregated onto `node`.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with the rolled-up reasons attached (as a side effect on its tag map).
*/
def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = {
withInfos(node, Set.empty, exprs: _*)
}

/**
* Checks whether a TreeNode has any explain information attached
* True if any fallback reason has been recorded on `node` (via [[withInfo]] / [[withInfos]]).
* Callers that need to short-circuit when a prior rule pass has already decided a node falls
* back can use this as the sticky signal.
*/
def hasExplainInfo(node: TreeNode[_]): Boolean = {
node.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ object CometCoverageStats {
object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

/**
* Records handler class names whose `getSupportLevel` has already returned `Unsupported` or
* `Incompatible` (without `allowIncompat`) on a given operator, so that repeat invocations of
* the same handler on the same node during later rule passes can short-circuit without
* re-running the check. Orthogonal to [[EXTENSION_INFO]]; keyed per handler so other handlers
* on the same node are unaffected.
*/
val FAILED_HANDLERS = new TreeNodeTag[Set[String]]("CometFailedHandlers")

def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
node match {
case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan)
Expand Down
42 changes: 30 additions & 12 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec if CometShuffleExchangeExec.nativeShuffleSupported(s) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)

case s: ShuffleExchangeExec if CometShuffleExchangeExec.columnarShuffleSupported(s) =>
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured)
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
plan.transformUp { case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None =>
s
}
}
}

Expand Down Expand Up @@ -479,8 +480,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}

/** Convert a Spark plan to a Comet plan using the specified serde handler */
private def convertToComet(op: SparkPlan, handler: CometOperatorSerde[_]): Option[SparkPlan] = {
private[rules] def convertToComet(
op: SparkPlan,
handler: CometOperatorSerde[_]): Option[SparkPlan] = {
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
if (hasFailedHandler(op, handler)) {
return None
}
if (isOperatorEnabled(serde, op)) {
// For operators that require native children (like writes), check if all data-producing
// children are CometNativeExec. This prevents runtime failures when the native operator
Expand Down Expand Up @@ -521,6 +527,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
handler.getSupportLevel(op) match {
case Unsupported(notes) =>
withInfo(op, notes.getOrElse(""))
recordFailedHandler(op, handler)
false
case Incompatible(notes) =>
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
Expand All @@ -539,6 +546,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
s"$opName is not fully compatible with Spark$optionalNotes. " +
s"To enable it anyway, set $incompatConf=true. " +
s"${CometConf.COMPAT_GUIDE}.")
recordFailedHandler(op, handler)
false
}
case Compatible(notes) =>
Expand All @@ -556,6 +564,16 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}
}

private def hasFailedHandler(op: SparkPlan, handler: CometOperatorSerde[_]): Boolean = {
op.getTagValue(CometExplainInfo.FAILED_HANDLERS)
.exists(_.contains(handler.getClass.getName))
}

private def recordFailedHandler(op: SparkPlan, handler: CometOperatorSerde[_]): Unit = {
val existing = op.getTagValue(CometExplainInfo.FAILED_HANDLERS).getOrElse(Set.empty[String])
op.setTagValue(CometExplainInfo.FAILED_HANDLERS, existing + handler.getClass.getName)
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
// Only consider converting leaf nodes to columnar currently, so that all the following
// operators can have a chance to be converted to columnar. Leaf operators that output
Expand Down
Loading
Loading