[RFC] Add lambda support and array_transform udf#18921
[RFC] Add lambda support and array_transform udf#18921gstvg wants to merge 7 commits intoapache:mainfrom
Conversation
Outdated# Traversing Expr trees with a schema that include lambdas parametersThe parameters of a lambda aren't present in the schema of the plan they belong to. During tree traversals that use a schema to check expressions datatype, nullability and metadata, there must be a way to access a schema which includes those parameters. Expr tree traversal with wrong schema┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Option 1. Per-lambda schema with a new set of TreeNode methods: *_with_schemaOnce again, this PR adds another set of TreeNode-like methods on logical and physical expressions, that while traversing expression trees, when they find a ScalarUDF that contains a lambda on its arguments, uses ScalarUDF::lambdas_parameters to create a schema adjusted for each of its arguments, and pass it as an argument to the visiting/transforming function. impl Expr {
pub fn transform_with_schema<
F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
>(
self,
schema: &DFSchema,
f: F,
) -> Result<Transformed<Self>> {}
}Example usage: pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
let mut has_placeholder = false;
// Provide the schema as the first argument.
// Transforming closure receive an adjusted_schema as argument
self.transform_with_schema(schema, |mut expr, adjusted_schema| {
match &mut expr {
// Default to assuming the arguments are the same type
Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
// use adjusted_schema and not schema. Those expressions may contain
// columns referring to a lambda parameter, which Field would only be
// available in adjusted_schema and not in schema
rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
}
....In order to add the lambda parameters to schema, we need to take into account DFSchema properties: "Unqualified fields must be unique not only amongst themselves, but also must have a distinct name from any qualified field names" Since lambdas parameters are always unqualified, they may conflict with columns of the outer schema, even though those being qualified. To fix this conflict, we can either: 1: Replace the existing column with the lambda parameter, in the same index of the vec of fields of the schema, in order to not change the index of columns to the right of it. That's the current approach in this PR Expr tree traversal with adjusted schema, replacing conflicts +------------------------------------------------------------------+
| array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |
| |
| a = Int32 |
| b = List(List(Int32)) |
| c = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| (b, i) -> array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| b -> b + c + i |
| |
| a = Int32 |
| b = Int32 ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
2: Rename the shadowed column to an unique, non-conflicting name and add the lambda parameter to the end of the vec of fields of the schema. This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. It's trivial to change the PR to use this. Expr tree traversal with adjusted schema, renaming conflicts┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b_shadowed2 = List(Int32) ╷
╷ b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Lambdas usually are evaluated with a different number of rows than that of the outer scope, as in the example, where array_transform is executed with one row, and its lambda with two rows, one for each element of the array. The UDF implementation is responsible for adjusting the captured columns with the number of rows of its parameters with whatever logic makes sense to it. For array_transform, its to copy the value of the captured column for each element of the arrays: This adjustment is costly, so it's necessary to provide a way to the implementation to avoid adjusting uncaptured columns. It's intuitive to just remove the uncaptured columns, but note in the diagram and in the query below that it can change the index of captured columns. The "c" column has index 2 in the outer scope but ends up with index 1 in the others scopes Expr tree traversal with a schema with uncaptured columns removed┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a@0 = Int32 ╷
╷ b@1 = List(List(Int32)) ╷
╷ c@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ b@0 = Int32 ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) -> array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;Option 1a: Nullify uncaptured columnsTo keep the indices stable, this PR won't remove uncaptured columns, as such, they are still present in the adjusted schema with their original type during tree traversals with the new _with_schema methods. However, to avoid the costly adjustment, when they are passed to the UDF in invoke_with_args, they are substituted with columns with the Null datatype. Expr execution/evaluation RecordBatch schema with uncaptured columns substituted with Null columns┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = Int32 ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Option 1b TreeNode *_with_indices_mappingTo avoid keeping uncaptured columns in the schema and substituting them with null in the batch, is possible to add another set of TreeNode-like methods on physical expressions that calls the visiting/transforming function with a second parameter of type HashMap<usize, usize> mapping the indices of the current scope with the ones from the outermost scope. This requires that before calling the visiting/transforming function for a physical lambda expression, all its subtree be visited to collect all the captured columns to build the indices mapping. Inner lambdas require the process again and can't reuse the work of the outer lambda. This may be costly for lambdas with complex expressions and/or highly nested. impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
pub fn transform_with_indices_mapping<
F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
>(
self,
mut f: F,
) -> Result<Transformed<Self>> {}
}Expr tree traversal with indices_mapping: "c" has index 2 in the root scope but 1 in the others┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ indices_mapping = {} ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
The code on minimize_join_filter would change to: fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
let mut used_columns = HashSet::new();
expr.apply_with_indices_mapping(|expr, indices_mapping| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
// this column may be child of a lambda, where this indice would refer to the lambda
// scoped schema, which won't include uncaptured columns from the plan input,
// and therefore may differ from the indices of the schema of the input plan
// In such cases, indices_mapping contain the mapping to the index of the input plan
// if a mapping is not found, it should be a column referring to a lambda parameter
let scoped_index = col.index();
if let Some(plan_index) = indices_mapping.get(scoped_index) {
used_columns.insert(plan_index);
}
}
Ok(TreeNodeRecursion::Continue)
})
...
}Option 2. Create a schema with all parameters from all lambdas for tree traversalsUse a secondary schema containing all parameters from all lambdas. For that, expressions must be transformed, normalizing all lambda parameters and its references, with a unique qualifier per lambda, so they can coexist without conflicts in this schema. A qualifier field would be added to the lambda expr pub struct Lambda {
pub qualifier: Option<String>,
pub params: Vec<String>,
pub body: Box<Expr>,
}Schema of the example: From my understanding of this video, this is similar to what DuckDB does on its binder, although with differences in the evaluation part. I didn't find any other resource for other open source engines with lambda support, like Clickhouse and Spark. This works well when dealing with plans nodes, where, during plan creation time or schema recomputation, we can normalize its lambdas, create the extended schema and save it as plan field, exposing it with a method like "lambda_extended_schema", although with an added cost to plan creation/schema recomputation. The lambda normalization actually requires two passes, a first to collect any existing lambda qualifier to avoid reusing them in the last, normalizing pass. How code would look like: //from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)Another example: impl LogicalPlan {
pub fn replace_params_with_values(
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
self.transform_up_with_subqueries(|plan| {
// use plan.lambda_extended_schema() containing lambdas parameters
// instead of plan.schema() which wont
let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|e| {
// if this expression is child of lambda and contain columns referring it's parameters
// the lambda_extended_schema already contain them
let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
....However, when working with functions/methods that deal directly with expressions, detached from a plan, the expression lambdas may be unnormalized, and the extended schema is unavailable. There's a few public methods/functions like that, like infer_placeholder_types for example: impl Expr {
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
let mut has_placeholder = false;
self.transform(|mut expr| ...)
...
}
} It could either: 1: Require to be only called with normalized expressions, and that the schema argument be the extended schema, or return an error otherwise, which is restrictive and put strain on users 2: Allow to be called with unnormalized expressions, visit the whole expr tree collecting the existing lambdas qualifiers to avoid to avoid duplicate qualifiers in the next step, perform a first transformation to guarantee that the expression lambdas are normalized, create the extended schema, for only then perform the second transformation to infer the placeholder types using the extended schema. While it can document that the returned expression is normalized, it's still a regular Expr which doesn't encode that property in its type. Also, without changing the method signature, it wouldn't even be possible to return the extended schema to allow it to be used again in other places without recomputation. This is costly and won't allow reuse of its costly work Normalized example: select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) -> array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;Just like with the first option, this also sets uncaptured columns to Null, as well as unavailable/out-of-scope lambdas parameters. Expr tree batch evaluation with a single extended schema┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ t.a = Int32 ╷
╷ t.b = List(List(Int32)) ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Null ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
With this option, indices are always stable across the whole tree This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. Comparison between options:
Splitting this into smaller PRsIf this PR is decided to move forward, it will likely be with smaller PRs. In that case, I already planned a division shown below. It's necessary to analyze the graph, as it doesn't help with the discussion of this text, and it's included here just to show a reasonable granularity of smaller PRs I could find in case it helps decide whether to move this forward or not. Each rectangular node is a PR. Asymmetrical nodes are a collection of smaller PRs which share the same dependencies and aren't a dependency of any other PR. Green ones can be opened immediately. Gray ones contain unmet dependencies. Merged PRs will be colored with blue. Left-to-Right full-screen link Detailsgraph LR
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Top-to-Bottom full-screen link Detailsgraph TB
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Right-to-Left full-screen link Detailsgraph RL
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Bottom-to-Top full-screen link Detailsgraph BT
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
|
|
Thanks a lot @gstvg, huge amount of work apparently (even if I am not enough skilled to judge). I am waiting for such functions since a very long time.. IMHO, this is a MUST HAVE in datafusion. All serious analytics engine provide this kind of functions, DuckDB implementation is very mature, Polars seems to have it as well even if I didn"t tried it yet and I really hope that datafusion will jump into this enhancement very quickly as it is again a big miss. Use cases like list_filter(list, expr), list_map(list, expr) and list_reduce(list, base, expr) will be available soon. Again, thanks a lot for your work and I cross the fingers for a quick progress. |
|
@gstvg This is super exciting! I'll review over the next couple of days. cc @SparkApplicationMaster @andygrove would be great to get your thoughts too 😃 |
|
Thanks @fbx31. This is high priority for me right now. I also hope we can finish this quickly. It's indeed a lot work, but now I believe it ended up that way because I went to fast in the wrong direction... More on the comment below. Thanks again! |
|
Thanks @shehabgamin. I analyzed the Spark implementation one last time, and realized I could have done similarly since the beginning... I've already tested it locally and pretend to push soon, the diff reduced to ~2K LOC, mostly in new, self contained code with small changes in existing code and without requiring changes in downstream code. I hope you haven't put time reviewing it yet because I believe the spark approach is much better and also easier/faster to review Basically, for planning, we lazily set the Field in the lambda column itself, instead of injecting it into a DFSchema //logical
struct LambdaColumn {
name: String,
field: Option<FieldRef>,
}And for evaluation, again, we lazily set the value of a lambda column instead of injecting it into a RecordBatch //physical
struct LambdaColumn {
name: String,
field: FieldRef,
value: Option<ColumnarValue>,
}I think I initially ruled that out because other expressions doesn't contain it's own Field, and this have been discussed before in #12604 *, and didn't went forward, which the difference that the other expressions Field's naturally exist in the plan schema, which now I believe justifies this difference. I will push soon and update the PR description, thanks again! *This is being discussed again in #18845 |
|
This is an exciting initiative! I skimmed through the PR, and I have some thoughts regarding how this can fit into DataFusion's existing setup. I wonder if we can have
This is just my rough thoughts. Happy to discuss! |
|
Thanks @linhr
Yeah, this is the major challenge in this PR. Currently,
Could you expand this further? It is like |
Suppose we have During physical planning, we could resolve When I worked with I haven't thought about function registry, documentation etc. which we can get for free in the existing |
|
@linhr Thanks, I got it now. I think it's possible to resolve lambda parameters and remove them from the arguments list with |
|
@gstvg any updates on this one? 😇 |
|
@linhr
Unfortunately, trying to resolve lambdas with
I mean, this was the major challenge of the PR on it's first implementation, now that we use
Sorry, at first I thought this only means resolving/embedding the lambda body within the UDF itself and removing it from To streamline the discussion, I have a few comments for each of these options, if they're applicable: Omitting the lambda body from TreeNode traversalsAt least the following traversals currently use Projection pushdown for column capture, type coercion for correctness, expr simplifier and CSE for performance. Being only 4 internal traversals, is easy to specially handle them, but since DF is an open system, I believe there should be a way to downstream users to visit/transform lambdas expressions. We could document that they need to be specially handled, outside In my first iteration of this PR, the "outdated" sections of the description and of the first comment, I manually checked every Removing Expr::Lambda variant from Expr enum and it's physical counterpartRemoving One example is struct BindLambdaVariable<'a> {
variables: HashMap<&'a str, (ArrayRef, usize)>,
//(variable value, number of times it has been shadowed by other lambdas variables in inner scopes in the current position of the tree)
}
impl TreeNodeRewriter for BindLambdaVariable<'_> {
type Node = Arc<dyn PhysicalExpr>;
fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
if let Some((value, shadows)) = self.variables.get(lambda_variable.name()) {
if *shadows == 0 {
return Ok(Transformed::yes(Arc::new(
lambda_variable.clone().with_value(value.clone()),
)));
}
}
} else if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
for param in inner_lambda.params() {
if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
*shadows += 1;
}
}
}
Ok(Transformed::no(node))
}
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
for param in inner_lambda.params() {
if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
*shadows -= 1;
}
}
}
Ok(Transformed::no(node))
}
}Implementation without Physical Lambda on AST fn bind_lambda_variables(
node: Arc<dyn PhysicalExpr>,
params: &mut HashMap<&str, (ArrayRef, usize)>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
node.transform_down(|node| {
if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
if let Some((value, shadows)) = params.get(lambda_variable.name()) {
if *shadows == 0 {
return Ok(Transformed::yes(Arc::new(
lambda_variable.clone().with_value(value.clone()),
)));
}
}
} else if let Some(fun) = node.as_any().downcast_ref::<LambdaUDFExpr>() {
let mut transformed = false;
let new_lambdas = fun
.lambdas()
.iter()
.map(|(params_names, body)| {
for param in *params_names {
if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
*shadows += 1;
}
}
let new_body = bind_lambda_variables(Arc::clone(body), params)?;
transformed |= new_body.transformed;
for param in *params_names {
if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
*shadows -= 1;
}
}
Ok((params_names.to_vec(), new_body.data))
})
.collect::<Result<_>>()?;
let new_args = fun
.args()
.iter()
.map(|arg| {
let new_arg = bind_lambda_variables(Arc::clone(arg), params)?;
transformed |= new_arg.transformed;
Ok(new_arg.data)
})
.collect::<Result<_>>()?;
let fun = fun.with_new_children(new_args, new_lambdas);
return Ok(Transformed::new(
Arc::new(fun),
transformed,
TreeNodeRecursion::Stop,
));
}
Ok(Transformed::no(node))
})
}Note that this transformation always return a constant Resolve/embed lambdas on the UDF implementation itself(essentially partition args from lambdas)By partition args and lambdas, we lose positional data, which are necessary to implement Some closed system without support for udf with lambdas use a common positioning for lambdas and can just use them without having to store positional data nor lambdas and other args in the same ordered list: Always last for spark, duckdb and snowflake: transform(array(1), v -> v*2) -- spark
list_transform([1, 2, NULL, 3], lambda x: x + 1) -- duckdb
transform([1, 2, 3], a INT -> a * 2) -- snowflakeAlways first for clickhouse: arrayMap(x -> (x + 2), [1, 2, 3])But I think that datafusion as an open and extendable system shouldn't stick to a single convention as different deployments may want to support one, another or both conventions. Furthermore, my interest in lambda functions is to implement union manipulating udfs, which receives multiple lambdas interleaved with regular arguments. Consider an union_transform(
union_value,
'str', str -> trim(str),
'num', num -> num*2
)where today the only way is with this(if/when support for case union_tag(union_value)
when 'str' then union_value('str', trim(union_extract(union_value, 'str')))
when 'num' then union_value('num', union_extract(union_value, 'num') * 2)
endFor such cases, storing positional data or manually implementing logical and phyiscal formatting and This also omit positional info from the implementation. None lambda udf discussed so far, including union_transform(
union_value,
'str', str -> trim(str),
'num', _num -> 0 -- lambda that returns constant/scalar
)
union_transform(
union_value,
'str', str -> trim(str),
'num', 0 -- shorter syntax, pass constant directly
)That syntax does requires positional info for evaluation. It's really just an example and I don't plan to support it on my Instead, if we add a new struct LambdaFunction { invocation: Box<dyn LambdaInvocation> } //invocation includes all args, both lambdas and non-lambdas
Expr::LambdaFunction(LambdaFuntion::new(ArrayTransform::new(all_args_including_lambdas)))
// instead of
struct LambdaFunction { func: Arc<dyn LambdaUDF>, args: Vec<Expr> } // func owns only lambdas, and non lambdas are stored on args
Expr::LambdaFunction(LambdaFunction::new(ArrayTransform::new(lambda), args_without_lambdas))Insights from Spark implementationI'm don't have much experience with Spark and Scala, I only implemented a custom data source a few years ago, so if something below seems wrong, it probably is. Spark includes both HigherOrderFunction trait and LambdaFunction class, similar to our current Some traversals branch directly on Lambda Binder, (similar to ours BindLambdaVariable) Some on branch SessionCatalog Some branch on CheckAnalysis Is true that some traversals branch on match expr {
Expr::ScalarFunction(fun) if fun.args.iter().any(|arg| matches!(arg, Expr::Lambda(_))) => ...,
// with helper
Expr::ScalarFunction(fun) if fun.contains_lambdas() => ...
...
}
impl ScalarFunction {
fn contains_lambdas(&self) -> bool {
self.args.iter().any(|arg| matches!(arg, Expr::Lambda(_)))
}
// Other helpers could be added to like:
fn lambda_args(&self) -> impl Iterator<Item = &LambdaFunction> {
self.args.iter().filter_map(|arg| match arg {
Expr::Lambda(l) => Some(l),
_ => None
})
}
fn non_lambda_args(&self) -> impl Iterator<Item = &Expr> {
self.args.iter().filter(|arg| !matches(arg, Expr::Lambda(_)))
}
}case class ArrayTransform(
argument: Expression,
function: Expression)
extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {
override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)
override protected def bindInternal(
f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
val ArrayType(elementType, containsNull) = argument.dataType
function match {
case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
case _ =>
copy(function = f(function, (elementType, containsNull) :: Nil))
}
}And it's instantiated like that: val param = NamedLambdaVariable("x", et, containsNull)
val funcBody = insertMapSortRecursively(param)
ArrayTransform(e, LambdaFunction(funcBody, Seq(param)))So, regarding to resolving lambdas/partition lambdas from args, all args are owned by the implementation, and not by Finally, running this script on val df = spark.sql("SELECT transform(array(1), v -> v*2)")
val transform = df.queryExecution.logical.expressions(0)
transform.foreach { node =>
println(s"${node.nodeName}: ${node.toString}")
}
println(transform.treeString)In short, if we more or less agree with my points above, adding a new What do you think? I'm missing or misunderstanding something? Thanks! cc @keen85 |
|
Sorry for missing the update on this thread and thanks for the detailed analysis @gstvg! The reasoning makes a lot of sense to me. And it's great to see your investigation on the Spark codebase. I didn't think deeply enough about how the lambda function would need to interact with various parts in DataFusion, such as tree traversal, SQL unparser, and the general ability to preserve argument positions. So the Therefore, feel free to continue the ideas you are pursuing! And let's see if DataFusion maintainers who are familiar with this part of the codebase have anything to add for the approach. |
|
Thanks @gstvg for driving this. I missed this PR and actually was advocating for lambda support in DataFusion roadmap. Let me explore this approach, in high level IMO it makes sense to introduce LambdaUDF trait so taking as example Then we likely need to Bind It is great to see this effort is moving |
|
Thanks @linhr! It most applies to having lambdas and args partitioned, omitting the body on Thanks @comphead! The current representation looks somewhat similar to your suggestion, except that is a The reason why adding If the added code for a let filter_array = lambda.evaluate(&batch)?;
let ColumnarValue::Array(filter_array) = filter_array else {
return exec_err!("array_filter requires a lambda that returns an array of booleans");
};
let filter_array = as_boolean_array(&filter_array)?;
let filtered = filter(&values, filter_array)?;
for row_index in 0..list_array.len() {
if list_array.is_null(row_index) {
// Handle null arrays by keeping the offset unchanged
offsets.push_length(0);
continue;
}
let start = value_offsets[row_index];
let end = value_offsets[row_index + 1];
let num_true = filter_array
.slice(start.as_usize(), (end - start).as_usize())
.true_count();
offsets.push_length(num_true);
}
let offsets = offsets.finish();
let list_array = GenericListArray::<OffsetSize>::try_new(
Arc::clone(field),
offsets,
filtered,
nulls.cloned(),
)?;
Ok(Arc::new(list_array))One thing that can be optimized is use Buffer::count_set_bits_offset instead of
I believe that by using @rluvaton I see you also were not fond of adding physical expr's to Finally, how would the new struct LambdaFunction {
args: Vec<Expr>,
fun: Arc<dyn LambdaUDF>,
}
enum Expr {
...
Expr::LambdaFunction(LambdaFunction),
}It also can be made more like Spark: trait LambdaInvoker {
...
fn invoke(&self, args: Vec<Expr>) -> Box<dyn LambdaInvocation>;
}
enum Expr {
...
Expr::LambdaFunction(Box<dyn LambdaInvocation>),
}I believe Thanks! |
|
thanks a lot for this PR. Here are my thoughts: Future features that the API should support without or minimal breaking changesCouple of things to make sure we support or have a way to add them in the future without breaking changes:
Things that every lambda implementation would need to handle
fn get_list() -> FixedSizeListArray<i32> {
FixedSizeListArray::new(
Arc::new(Field::new_list_field(DataType::Int8, false)),
3,
Arc::new(Int8Array::from(vec![
1, 2, 3,
0, 1, 1,
4, 5, 6
])),
Some(NullBuffer::from(&[true, false, true])),
)
}In every case here, we should answer:
I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that. Why a new trait
|
|
Thanks @rluvaton and @gstvg , its nice you mentioned
Right, on high level it could be like Impl So for example and call it from caller built in function |
|
I updated the PR to a
Yeah, I think that 2 is the main point. The previous ScalarUDF based approach only added a single method to the trait that already contains 20, it didn't require any change for non-lambda udfs (this would be unacceptable), and compared to the actual LambdaUDF based, lambda UDFs only required 2 additional lines of code per implementation, but, while I can't think of any ... all my previous counter-arguments can fall apart in the future with a single requirement change. There's upfront cost in review time and time to merge, but also more room to work in the future
Currently any type is accepted and passed to to the implementation to derive the parameters from it, usually the inner values of a list, but I want to work with unions too, for example. But as of now array_transform doesn't handle ListView's (I thought there's no plans to support it overall?). Since the ListView values may contain unreferenced values, should it be compacted, or casted to a regular compacted List? And if so, return the transformed List or cast it to ListView?
Supported: LambdaFunctionArgs.args can hold multiple lambdas fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> {
let [list_value, lambda] = take_function_args(self.name(), &args.args)?;
let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) =
(list_value, lambda)
else {
return exec_err!(...Since the beginning I worked to support multiple lambdas to implement union manipulating functions like this: union_transform(
union_value,
'str', str -> trim(str),
'num', num -> num*2,
'bool', bool -> NOT bool,
)
CREATE TABLE t as SELECT 1 as n;
query ?
SELECT array_transform([1, 2], (e) -> n) from t;
----
[1, 1]
// use closures so that bind_lambda_variables evaluates only the params that are actually referenced
// avoiding unnecessary computations
let values_param = || Ok(Arc::clone(list_values));
let indices_param = || elements_indices(&list_array);
let binded_body = bind_lambda_variables(
Arc::clone(&lambda.body),
&lambda.params,
&[&values_param, &indices_param],
)?;*after a refactor, bind_lambda_params actually eager evaluated all params, but I'll fix that
SELECT array_transform(t.v, (v1, i) -> array_transform(v1, (v2, j) -> array_transform(v2, v3 -> j)) ) from t;
----
[[[0, 1], [0]], [[0]], [[]]]
return_field_from_args methodpub struct LambdaReturnFieldArgs<'a> {
/// The data types of the arguments to the function
///
/// If argument `i` to the function is a lambda, it will be the field returned by the
/// lambda when executed with the arguments returned from `LambdaUDF::lambdas_parameters`
///
/// For example, with `array_transform([1], v -> v == 5)`
/// this field will be `[Field::new("", DataType::List(DataType::Int32), false), Field::new("", DataType::Boolean, false)]`
pub arg_fields: &'a [ValueOrLambdaField],
/// Is argument `i` to the function a scalar (constant)?
///
/// If the argument `i` is not a scalar, it will be None
///
/// For example, if a function is called like `my_function(column_a, 5)`
/// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]`
pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}
/// A tagged Field indicating whether it correspond to a value or a lambda argument
#[derive(Clone, Debug)]
pub enum ValueOrLambdaField {
/// The Field of a ColumnarValue argument
Value(FieldRef),
/// The Field of the return of the lambda body when evaluated with the parameters from LambdaUDF::lambda_parameters
Lambda(FieldRef),
}
fn return_field_from_args(
&self,
args: datafusion_expr::LambdaReturnFieldArgs,
) -> Result<Arc<Field>> {
let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] =
take_function_args(self.name(), args.arg_fields)?
else {
return exec_err!(
"{} expects a value follewed by a lambda, got {:?}",
self.name(),
args
);
};
//TODO: should metadata be copied into the transformed array?
// lambda is the resulting field of executing the lambda body
// with the parameters returned in lambdas_parameters
let field = Arc::new(Field::new(
Field::LIST_FIELD_DEFAULT_NAME,
lambda.data_type().clone(),
lambda.is_nullable(),
));
let return_type = match list.data_type() {
DataType::List(_) => DataType::List(field),
DataType::LargeList(_) => DataType::LargeList(field),
DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
_ => unreachable!(),
};
Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
}lambdas_parameters methodtrait LambdaUDF {
/// Returns the parameters that any lambda supports
fn lambdas_parameters(
&self,
args: &[ValueOrLambdaParameter],
) -> Result<Vec<Option<Vec<Field>>>>;
}
pub enum ValueOrLambdaParameter<'a> {
/// A columnar value with the given field
Value(FieldRef),
/// A lambda
Lambda,
}
// array_transform implementation
impl LambdaUDF for ArrayTransform {
fn lambdas_parameters(
&self,
args: &[ValueOrLambdaParameter],
) -> Result<Vec<Option<Vec<Field>>>> {
let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] =
args
else {
return exec_err!(
"{} expects a value followed by a lambda, got {:?}",
self.name(),
args
);
};
let (field, index_type) = match list.data_type() {
DataType::List(field) => (field, DataType::Int32),
DataType::LargeList(field) => (field, DataType::Int64),
DataType::FixedSizeList(field, _) => (field, DataType::Int32),
_ => return exec_err!("expected list, got {list}"),
};
// we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
// nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
// as datafusion will do that for us
let value = Field::new("value", field.data_type().clone(), field.is_nullable())
.with_metadata(field.metadata().clone());
let index = Field::new("index", index_type, false);
Ok(vec![None, Some(vec![value, index])])
}
}
The current implementation is functional only with the parameters names without requiring their datatypes, and IHMO it's easier to the users to be like that, both expr_api and sql. The expr api could look like this: array_transform(
col("my_array"),
lambda(
["current_value"], // define the parameters name
lambda_variable("current_value") * lit(2) // the lambda body
)
)How to parse the clickhouse syntax I believe it's a question for the future: either check that the types match with the ones returned by
Saw your review in #17220 but thought it mattered only for performance and didn't thought about fallible expressions.. For fixed size lists, I think we can replace null sublists with the first non-null sublist of the array, if the non-null also fails then it's a user problem
Also saw on #17220 about this but forgot it, thanks for reminding me, will fix soon
I think the best we can do is to document it, add examples and helpers functions and add a lot of comments in one or few LambdaUDFs as reference implementations. |
|
Thanks @gstvg I feel we need some structure here. I can try to start this doc by going through this RFC, or if you feel more comfortable you can create it, WDYT? |
|
I had a partial implementation for |
|
You are right @comphead, my first implementation was more complex and I thoroughly documented it, but after simplifying it to the current version, I thought it was easy to grasp and poorly documented it, but I was obviously wrong. Despite not liking much my high level writing skills, I believe that writing the doc is my responsibility (there's few edge cases and alternative approaches not discussed yet). But I would really appreciate if you reviewed it, I can push a |
it sounds great IMO, there are a lot of people expressing the interest to the topic and love to review as well |
|
@andygrove made another attempt of taming lambdas for Comet in apache/datafusion-comet#3611 |
|
@comphead Very cool, the LambdaVariable I'm using here was inspired by Spark btw, both ended up looking similar. |
LiaCastaneda
left a comment
There was a problem hiding this comment.
👋 We're very interested in this feature and would love to see it land -- thanks for the effort on this epic PR and the detailed DOC.md! Let us know if there's anything we can do to help move it forward 🙇♀️
| Expr::LambdaFunction(expr) => producer.handle_lambda_function(expr, schema), | ||
| Expr::Lambda(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), // not yet implemented in substrait-rs | ||
| Expr::LambdaVariable(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), // not yet implemented in substrait-rs |
There was a problem hiding this comment.
@benbellick added some in the substrait spec in substrait-io/substrait#889, but might not be released yet. I wonder if the expressions can actually be mapped 1:1
There was a problem hiding this comment.
I will address this comment more fully later. The protos for lambdas do exist in substrait-rs but haven't been released yet. I am looking into how to do that release right now.
There was a problem hiding this comment.
I have done the release. v0.62.3 contains the protos for lambdas.
Considering the size of this PR, I think it is appropriate to leave the handling of substrait lambdas in datafusion for a subsequent PR. But let's definitely make sure that we do this PR with the substrait interaction in mind :)
I will review the relevant parts for that reason. Thanks!
There was a problem hiding this comment.
nice, I agree the substrait consuming /producing can be left as a follow up, should we bump substrait in datafusion in the meantime?
There was a problem hiding this comment.
Thanks @LiaCastaneda for the info and @benbellick for the release
I wonder if the expressions can actually be mapped 1:1
I did a quick check and I believe it's possible, yes.
LambdaFunction produces a regular substrait ScalarFunction, and to consume we just check whether any LambdaUDF exists with the given function name and/or if any arg is a lambda.
To produce a substrait Lambda with the field parameters struct where each field corresponds to a parameter, we use the return of LambdaUDF::lambdas_parameters, and for consuming, we can use default parameters names like p1, p2 etc
To produce a FieldReference/LambdaParameterReference with a correct steps_out and direct_reference.struct_field.field, we keep a HashMap<String, (usize, usize)> of lambda_parameter_name => (step_outs, struct_field_index), that get's updated for every Lambda to be produced: step_outs is reset to 0 for shadowed parameters, others incremented by one, and struct_field_index set according to it's position within the parameters of the lambda it originates from. Consuming into LambdaVariable with it's field resolved is similar to SQL planning: keep a HashMap<usize, FieldRef> of direct_reference.struct_field.field => field that get's updated for every LambdaUDF by the return of it's lambdas_parameters method
I'll address your another comments and then implement substrait support on another branch to make sure is viable
|
|
||
| `LambdaFunction` `Signature` is non functional | ||
|
|
||
| Currenty, `LambdaUDF::signature` returns the same `Signature` as `ScalarUDF`, but it's `type_signature` field is never used, as most variants of the `TypeSignature` enum aren't applicable to a lambda, and no type coercion is applied on it's arguments, being currently a implementation responsability. We should either add lambda compatible variants to the `TypeSignature` enum, create a new `LambdaTypeSignature` and `LambdaSignature`, or support no automatic type coercion at all on lambda functions. |
There was a problem hiding this comment.
I imagine type coercion for lambda functions would only apply to value arguments right?
For example, arr in array_transform(arr, v -> v) if arr is List<Int32> but the function only handles List<Int64>, people might forget to coerce this themselves.
What if we changed the coerce_types signature of LambdaUDF to:
fn coerce_types(&self, args: &[ValueOrLambdaParameter]) -> Result<Vec<Option<DataType>>>
So it coerces value args (returning Some(DataType)) but skips lambda args (returning None)? I think coerce_value_types would be a better name for this
There was a problem hiding this comment.
should we also add an unimplemented note in the substrait_consumer?
| impl ArrayTransform { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::any(2, Volatility::Immutable), |
There was a problem hiding this comment.
would it be clearer if we had a especial signature for lambda functions that has both volatility and parameter names?
pub struct LambdaSignature {
pub volatility: Volatility,
pub parameter_names: Option<Vec<String>>,
}
since this is a different trait to ScalarUDF, and types can't really de defined in an static way.
Closes #14205
This PR adds support for lambdas with column capture and the
array_transformfunction used to test the lambda implementation. Example usage:Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code.
3 new
Exprvariants are added,LambdaFunction, owing a new traitLambdaUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters. The reasoning why not usingColumninstead is later on this doc. The initial version of this PR modified ScalarUDF instead of using a new LambdaUDF, and some comments below are about that first version.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
LambdaUDFand thearray_transformimplementation ofLambdaUDFrelevant methods, collapsed due to their sizePhysical planning implementation is trivial:
The added
LambdaUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambdas_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.LambdaUDF
array_transform lambdas_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant LambdaUDF methods would be called and what they would return during planning and evaluation of the example
A pair LambdaUDF/LambdaUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
LambdaFunction invocation:
Instead of evaluating all it's arguments as ScalarFunction, LambdaFunction does the following:
ColumnarValuetoLambdaUDF::evaluateas aValueOrLambda::ValueLambdaFunctionLambdaArgcontaining the lambda body physical expression and a record batch containing any captured columns as aValueOrLambda::Lambdaand provide it toLambdaUDF::evaluate. To avoid costly copies of uncaptured columns, we swap them with aNullArraywhile keeping the number of columns on the batch the same so captured columns indices are kept stable across the whole tree. The recent Project record batches to avoid filtering unused columns inCASEevaluation #18329 instead projects-out uncaptured columns and rewrites the expr adjusting columns indexes. If that is preferrable we can generalize that implementation and use it here too.LambdaFunction evalution
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved either during construction time, as in the sql planner, or later by the anAnalyzerRule,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
For reference, Spark and Substrait also use a specialized node instead of a regular column
There's also discussions on making every expr own it's type: #18845, #12604
Possible fixes discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given LambdaFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from LambdaUDF::lambdas_parameters
How it would look like:
LambdaVariableevaluation, current implementation:The physical
LambdaVariablecontains an optionalColumnarValuethat must be binded for each batch before evaluation with the helper functionbind_lambda_variables, which rewrites the whole lambda body, binding any variable of the tree.LambdaVariable::evaluate
Unbinded:
After binding:
Alternative:
Make the
LambdaVariableevaluate it's value from the batch passed toPhysicalExpr::evaluateas a regular column. For that, instead of binding the body, theLambdaUDFimplementation would merge the captured batch of a lambda with the values of it's parameters. So that it happen via an index as a regular column, the schema used plan to physicalLambdaVariablemust contain the lambda parameters. This would be the only place during planning that a schema would contain those parameters. Otherwise it only can get the value from the batch via name instead of indexhow physical planning would look like
How ArrayTransform::invoke_with_args would look like:
Why is
LambdaVariableFieldis anOption?So expr_api users can construct a LambdaVariable just by using it's name, without having to set it's field. An
AnalyzerRulewill then set theLambdaVariablefield based on the returned values fromLambdaUDF::lambdas_parametersof anyLambdaFunctionit finds while traversing down a expr tree. We may include that rule on the default rules list for when the plan/expression tree is transformed by another rule in a way that changes the types of non lambda arguments of a lambda function, as it may change the types of it's lambda parameters, which would renderLambdaVariablefield's out of sync, as the rule would fix it. Or to not increase planning time we don't include it by default and instructexpr_apiusers to add it manually if neededWhy set
LambdaVariablefield during sql planning if it's optional and can be set later via anAnalyzerRule?Some parts of sql planning checks the type/nullability of the already planned children expression of the expr it's planning, and would error if doing so on a unresolved
LambdaVariableTake as example this expression:
array_transform([[0, 1]], v -> v[1]).FieldAccessv[1]planning is handled by theExprPlannerFieldAccessPlanner, which checks the datatype ofv, a lambda variable, whichExprSchemableimplementation depends on it's field being resolved, and not on thePlannerContextschema, requiring sql planner to planLambdaVariableswith a resolved fieldFieldAccessPlanner
Therefore we can't plan all arguments on a single pass, and must first plan the non-lambda arguments, collect their types and nullability, pass them to
LambdaUDF::lambdas_parameters, which will derive the type of it's lambda parameters based on the type of it's non-lambda argument, and return it to the planner, which, for each unplanned lambda argument, will create a newPlannerContextviawith_lambda_parameters, which contains a mapping of lambdas parameters names to it's type. Then, when planning aast::Identifier, it first check whether a lambda parameter with the given name exists, and if so, plans it into aExpr::LambdaVariablewith a resolved field, otherwise plan it into a regularExpr::Column.sql planning
LambdaFunctionSignatureis non functionalCurrenty,
LambdaUDF::signaturereturns the sameSignatureasScalarUDF, but it'stype_signaturefield is never used, as most variants of theTypeSignatureenum aren't applicable to a lambda, and no type coercion is applied on it's arguments, being currently a implementation responsability. We should either add lambda compatible variants to theTypeSignatureenum, create a newLambdaTypeSignatureandLambdaSignature, or support no automatic type coercion at all on lambda functions.