Add lambda support and array_transform udf#21679
Conversation
| } | ||
|
|
||
| fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { | ||
| self.args.iter().collect() |
There was a problem hiding this comment.
The function itself should be included in the children, otherwise you cant access part of the expression tree
as talked here:
please also add a comment explaining why it is important
There was a problem hiding this comment.
I believe this is about the lambda functions, right? All lambda functions of a given higher-order function are stored in self.args
If this is about the higher-order function itself, it shouldn't be included, in the same way scalar function doesn't include itself in it's children, right?
I see that the other PR you reviewed, #17220, the lambda functions aren't stored in the higher-order function and instead are resolved in function implementation. Should we do this here too?
Finally, physical expressions of a concrete higher-order function (not the case here), like array_exists being done in comet datafusion-comet#3611, does store the arg and the lambda function in different properties [1], and thus it's children method requires what I believe you are asking [2]
Again, if we should do this here too, please let me know, thanks
There was a problem hiding this comment.
Oh, I missed that func is Arc<dyn HigherOrderUDF>
| } | ||
| Expr::Lambda(Lambda { params, body }) => { | ||
| if body.any_column_refs() { | ||
| return plan_err!("lambda doesn't support column capture"); |
There was a problem hiding this comment.
please add a link to the issue that talk about column capture in lambda support
| // LambdaVariable.field will be made optional as in Expr::Placeholder | ||
| // and only LambdaVariable.name used, and field.name ignored, | ||
| // so they're not enforced to match for logical expressions | ||
| if field.data_type() != schema_field.data_type() | ||
| || field.is_nullable() != schema_field.is_nullable() | ||
| || field.metadata() != schema_field.metadata() | ||
| || field.dict_is_ordered() != schema_field.dict_is_ordered() |
There was a problem hiding this comment.
this can this can a source of bugs when adding properties to field
|
Given the PR size, I asked Claude to make initial review, here are some findings what is |
…erOrderFunctionExpr::try_new
| fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>>; | ||
|
|
||
| /// Return reference to higher_order_functions | ||
| fn higher_order_functions(&self) -> &HashMap<String, Arc<dyn HigherOrderUDF>>; |
There was a problem hiding this comment.
I wonder whether it would be a good idea to return an empty HashMap by default would prevent some broken builds for third party implementations
There was a problem hiding this comment.
Not against it, but #20312 also added a breking method. If both PR get's released together then I think this doesn't do much a difference, WDYT?
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
…e projection/optimization Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
…unparser Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
| fun: Arc<dyn HigherOrderUDF>, | ||
| name: String, | ||
| args: Vec<Arc<dyn PhysicalExpr>>, | ||
| lambda_positions: Vec<usize>, | ||
| return_field: FieldRef, |
There was a problem hiding this comment.
Please add comments what each property is, and give an example using array_transform
| /// [PhysicalExpr::evaluate] will not be called. The lambda *body* should be wrapped instead | ||
| /// If any arg referenced by `lambda_positions` does not contain a lambda or contains a wrapper | ||
| /// with multiple children before finding the lambda, the function evaluation will error | ||
| pub fn new( |
There was a problem hiding this comment.
I would rename this to try_new and return result, and rename the current try_new to something else
so we could add validation later without breaking api, like verification for lambda positions
| fn data_type(&self, _input_schema: &Schema) -> Result<DataType> { | ||
| Ok(self.return_field.data_type().clone()) | ||
| } | ||
|
|
||
| fn nullable(&self, _input_schema: &Schema) -> Result<bool> { | ||
| Ok(self.return_field.is_nullable()) | ||
| } |
There was a problem hiding this comment.
because users can create HigherOrderFunctionExpr with return_field using new that is not marked as unsafe, this could lead to datatype mismatch, can you please validate that the type match the function return type, same for nullable
| fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> { | ||
| Ok(Arc::clone(&self.return_field)) | ||
| } |
There was a problem hiding this comment.
if you have return field you don't need data type and nullable, also, please add the validation that the function output the same data type as what you return here
| &self.name, | ||
| Arc::clone(&self.fun), | ||
| children, | ||
| self.lambda_positions.clone(), |
There was a problem hiding this comment.
I think we should verify that the lambda positions are still valid, no?
|
run benchmark tpcds tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lambda_and_array_transform (1b668db) to dc6142e (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing lambda_and_array_transform (1b668db) to dc6142e (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
This a clean version of #18921 to make it easier to review
this is a breaking change due to adding variant to
Exprenum, new methods on traitsSession,FunctionRegistryandContextProviderand a new arg onTaskContext::newThis PR adds support for lambdas with column capture and the
array_transformfunction used to test the lambda implementation.Example usage:
Note: column capture has been removed for now and will be added on a follow on PR, see #21172
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,HigherOrderFunction, owing a new traitHigherOrderUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
HigherOrderUDFand thearray_transformimplementation ofHigherOrderUDFrelevant methods, collapsed due to their sizeThe added
HigherOrderUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambda_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.HigherOrderUDF
array_transform lambda_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved during construction time in the sql planner,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like: