Skip to content

[RFC] Add lambda support and array_transform udf#18921

Draft
gstvg wants to merge 7 commits intoapache:mainfrom
gstvg:lambda4
Draft

[RFC] Add lambda support and array_transform udf#18921
gstvg wants to merge 7 commits intoapache:mainfrom
gstvg:lambda4

Conversation

@gstvg
Copy link
Contributor

@gstvg gstvg commented Nov 25, 2025

Closes #14205

This PR adds support for lambdas with column capture and the array_transform function used to test the lambda implementation. Example usage:

CREATE TABLE t as SELECT 2 as n;

SELECT array_transform([2, 3], v -> v != t.n) from t;

[false, true]

-- arbitrally nested lambdas are also supported
SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2)));

[[[4, 6]]]

Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code.

3 new Expr variants are added, LambdaFunction, owing a new trait LambdaUDF, which is like a ScalarFunction/ScalarUDFImpl with support for lambdas, Lambda, for the lambda body and it's parameters names, and LambdaVariable, which is like Column but for lambdas parameters. The reasoning why not using Column instead is later on this doc. The initial version of this PR modified ScalarUDF instead of using a new LambdaUDF, and some comments below are about that first version.

Their logical representations:

enum Expr {
    LambdaFunction(LambdaFunction), // array_transform([2, 3], v -> v != t.n)
    Lambda(Lambda), // v -> v != t.n
    LambdaVariable(LambdaVariable), // v, of the lambda body: v != t.n
   ...
}

// array_transform([2, 3], v -> v != t.n)
struct LambdaFunction {
    pub func: Arc<dyn LambdaUDF>, // global instance of array_transform
    pub args: Vec<Expr>, // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != n)]
}

// v -> v != t.n
struct Lambda {
    pub params: Vec<String>, // ["v"]
    pub body: Box<Expr>, // v != n
}

// v, of the lambda body: v != t.n
struct LambdaVariable {
    pub name: String, // "v"
    pub field: Option<FieldRef>, // Some(Field::new("", DataType::Int32, false))
    pub spans: Spans,
}

The example would be planned into a tree like this:

LambdaFunctionExpression
  name: array_transform
  children:
    1. ListExpression [2,3]
    2. LambdaExpression
         parameters: ["v"]
         body:
            ComparisonExpression (!=)
              left:
                 LambdaVariableExpression("v", Some(Field::new("", Int32, false)))
              right:
                 ColumnExpression("t.n")

The physical counterparts definition:

struct LambdaFunctionExpr {
    fun: Arc<dyn LambdaUDF>, // global instance of array_transform
    name: String, // "array_transform"
    args: Vec<Arc<dyn PhysicalExpr>>, // [LiteralExpr([2, 3], LambdaExpr("v -> v != t.n"))]
    return_field: FieldRef, // Field::new("", DataType::new_list(DataType::Boolean, false), false)
    config_options: Arc<ConfigOptions>, 
}


struct LambdaExpr {
    params: Vec<String>, // ["v"]
    body: Arc<dyn PhysicalExpr>, // v -> v != t.n
}

struct LambdaVariable {
    name: String, // "v", of the lambda body: v != t.n
    field: FieldRef, // Field::new("", DataType::Int32, false)
    value: Option<ColumnarValue>, // reasoning later on
}

Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of LambdaUDF and the array_transform implementation of LambdaUDF relevant methods, collapsed due to their size

Physical planning implementation is trivial:
fn create_physical_expr(
    e: &Expr,
    input_dfschema: &DFSchema,
    execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
    let input_schema = input_dfschema.as_arrow();

    match e {
        ...
        Expr::LambdaFunction(LambdaFunction { func, args}) => {
            let physical_args =
                create_physical_exprs(args, input_dfschema, execution_props)?;

            Ok(Arc::new(LambdaFunctionExpr::try_new(
                Arc::clone(func),
                physical_args,
                input_schema,
                config_options: ... // irrelevant
            )?))
        }
        Expr::Lambda(Lambda { params, body }) => Ok(Arc::new(LambdaExpr::new(
            params.clone(),
            create_physical_expr(body, input_dfschema, execution_props)?,
        ))),
        Expr::LambdaVariable(LambdaVariable {
            name,
            field,
            spans: _,
        }) => lambda_variable(
            name,
            Arc::clone(field),
        ),
    }
}

The added LambdaUDF trait is almost a clone of ScalarUDFImpl, with the exception of:

  1. return_field_from_args and invoke_with_args, where now args.args is a list of enums with two variants: Value or Lambda instead of a list of values
  2. the addition of lambdas_parameters, which return a Field for each parameter supported for every lambda argument based on the Field of the non lambda arguments
  3. the removal of return_field and the deprecated ones is_nullable and display_name.
LambdaUDF
trait LambdaUDF {
    /// Returns a list of the same size as args where each value is the logic below applied to value at the correspondent position in args:
    /// 
    /// If it's a value, return None
    /// If it's a lambda, return the list of all parameters that that lambda supports
    /// based on the Field of the non-lambda arguments
    /// 
    /// Example for array_transform:
    /// 
    /// `array_transform([2, 8], v -> v > 4)`
    /// 
    /// let lambdas_parameters = array_transform.lambdas_parameters(&[
    ///      ValueOrLambdaParameter::Value(Field::new("", DataType::new_list(DataType::Int32, false)))]), // the Field associated with the literal `[2, 8]`
    ///      ValueOrLambdaParameter::Lambda, // A lambda
    /// ]?;
    ///
    /// assert_eq!(
    ///      lambdas_parameters,
    ///      vec![
    ///         None, // it's a value, return None
    ///         // it's a lambda, return it's supported parameters, regardless of how many are actually used
    ///         Some(vec![
    ///             Field::new("", DataType::Int32, false), // the value being transformed, 
    ///             Field::new("", DataType::Int32, false), // the 1-based index being transformed, not used on the example above, but implementations doesn't need to care about it
    ///         ])
    ///      ]
    /// )
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>>;
    fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>;
    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue>;
   // ... omitted methods that are similar in ScalarUDFImpl
}

pub enum ValueOrLambdaParameter {
    /// A columnar value with the given field
    Value(FieldRef),
    /// A lambda
    Lambda,
}

/// Information about arguments passed to the function
///
/// This structure contains metadata about how the function was called
/// such as the type of the arguments, any scalar arguments and if the
/// arguments can (ever) be null
///
/// See [`LambdaUDF::return_field_from_args`] for more information
pub struct LambdaReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the field returned by the
    /// lambda when executed with the arguments returned from `LambdaUDF::lambdas_parameters`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[
    //      ValueOrLambdaField::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)),
    //      ValueOrLambdaField::Lambda(Field::new("", DataType::Boolean, false))
    //  ]`
    pub arg_fields: &'a [ValueOrLambdaField],
    /// Is argument `i` to the function a scalar (constant)?
    ///
    /// If the argument `i` is not a scalar, it will be None
    ///
    /// For example, if a function is called like `my_function(column_a, 5)`
    /// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]`
    pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}

/// A tagged FieldRef indicating whether it correspond the field of a value or the field of the output of a lambda argument
pub enum ValueOrLambdaField {
    /// The FieldRef of a ColumnarValue argument
    Value(FieldRef),
    /// The return FieldRef of the lambda body when evaluated with the parameters from LambdaUDF::lambda_parameters
    Lambda(FieldRef),
}

/// Arguments passed to [`LambdaUDF::invoke_with_args`] when invoking a
/// lambda function.
pub struct LambdaFunctionArgs {
    /// The evaluated arguments to the function
    pub args: Vec<ValueOrLambda>,
    /// Field associated with each arg, if it exists
    pub arg_fields: Vec<ValueOrLambdaField>,
    /// The number of rows in record batch being evaluated
    pub number_rows: usize,
    /// The return field of the lambda function returned (from `return_type`
    /// or `return_field_from_args`) when creating the physical expression
    /// from the logical expression
    pub return_field: FieldRef,
    /// The config options at execution time
    pub config_options: Arc<ConfigOptions>,
}

/// A lambda argument to a LambdaFunction
pub struct LambdaFunctionLambdaArg {
    /// The parameters defined in this lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be `vec![Field::new("v", DataType::Int32, true)]`
    pub params: Vec<FieldRef>,
    /// The body of the lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be the physical expression of `-v`
    pub body: Arc<dyn PhysicalExpr>,
    /// A RecordBatch containing at least the captured columns inside this lambda body, if any
    /// Note that it may contain additional, non-specified columns,
    /// but that's implementation detail and should not be relied upon
    ///
    /// For example, for `array_transform([2], v -> v + t.a + t.b)`,
    /// this will be a `RecordBatch` with at least two columns, `t.a` and `t.b`
    pub captures: Option<RecordBatch>,
}

// An argument to a LambdaUDF
pub enum ValueOrLambda {
    Value(ColumnarValue),
    Lambda(LambdaFunctionLambdaArg),
}
array_transform lambdas_parameters implementation
impl LambdaUDF for ArrayTransform {
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        // list is the field of [2, 3]: Field::new("", DataType::new_list(DataType::Int32, false), false)
        let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] = args
        else {
            return exec_err!(
                "{} expects a value follewed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        // the field of [2, 3] inner values: Field::new("", DataType::Int32, false)
        let (field, index_type) = match list.data_type() {
            DataType::List(field) => (field, DataType::Int32),
            DataType::LargeList(field) => (field, DataType::Int64),
            DataType::FixedSizeList(field, _) => (field, DataType::Int32),
            _ => return exec_err!("expected list, got {list}"),
        };

        // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
        // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
        // as datafusion will do that for us
        let value = Field::new("", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());
        let index = Field::new("", index_type, false);

        Ok(vec![None, Some(vec![value, index])])
    }
}
array_transform return_field_from_args implementation
impl LambdaUDF for ArrayTransform {
    fn return_field_from_args(
        &self,
        args: datafusion_expr::LambdaReturnFieldArgs,
    ) -> Result<Arc<Field>> {
        // [
        //    Field::new("", DataType::new_list(DataType::Int32, false), false),
        //    Field::new("", DataType::Boolean, false),
        // ]
        let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] =
            take_function_args(self.name(), args.arg_fields)?
        else {
            return exec_err!(
                "{} expects a value follewed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        // lambda is the return_field of the lambda body
        // when evaluated with the parameters from lambdas_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            other => plan_err!("expected list, got {other}"),
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
}
array_transform invoke_with_args implementation
impl LambdaUDF for ArrayTransform {
    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> {
        let [list_value, lambda] = take_function_args(self.name(), &args.args)?;

        // list = [2, 3]
        // lambda = LambdaFunctionLambdaArg {
        //    params: vec![Field::new("v", DataType::Int32, false)],
        //    body: PhysicalExpr("v != t.n"),// the physical expression of the lambda *body*, and not the lambda itself: this is not a LambdaExpr. 
        //    captures: Some(record_batch!("t.n", Int32, [2]))
        // }
        let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) =
            (list_value, lambda)
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {} and {}",
                self.name(),
                list_value,
                lambda,
            );
        };

        let list_array = list_value.to_array(args.number_rows)?;
        let list_values = match list_array.data_type() {
            DataType::List(_) => list_array.as_list::<i32>().values(),
            DataType::LargeList(_) => list_array.as_list::<i64>().values(),
            DataType::FixedSizeList(_, _) => list_array.as_fixed_size_list().values(),
            other => exec_err!("expected list, got {other}")
        }

        // if any column got captured, we need to adjust it to the values arrays,
        // duplicating values of list with mulitple values and removing values of empty lists
        // list_indices is not cheap so is important to avoid it when no column is captured
        let adjusted_captures = lambda
            .captures
            .as_ref()
            //list_indices return the row_number for each sublist element: [[1, 2], [3], [4]] => [0,0,1,2], not included here
            .map(|captures| take_record_batch(captures, &list_indices(&list_array)?))
            .transpose()?
            .unwrap_or_else(|| {
                RecordBatch::try_new_with_options(
                    Arc::new(Schema::empty()),
                    vec![],
                    &RecordBatchOptions::new().with_row_count(Some(list_values.len())),
                )
                .unwrap()
            });

        // by using closures, bind_lambda_variables can evaluate only the needed ones avoiding unnecessary computations
        let values_param = || Ok(Arc::clone(list_values));
        //elements_indices return the index of each element within its sublist: [[5, 3], [7, 1, 1]] => [1, 2, 1, 2, 3], not included here
        let indices_param = || elements_indices(&list_array);

        let binded_body = bind_lambda_variables(
            Arc::clone(&lambda.body),
            &lambda.params,
            &[&values_param, &indices_param],
        )?;

        // call the transforming expression with the record batch
        let transformed_values = binded_body
            .evaluate(&adjusted_captures)?
            .into_array(list_values.len())?;

        let field = match args.return_field.data_type() {
            DataType::List(field)
            | DataType::LargeList(field)
            | DataType::FixedSizeList(field, _) => Arc::clone(field),
            _ => {
                return exec_err!(
                    "{} expected ScalarFunctionArgs.return_field to be a list, got {}",
                    self.name(),
                    args.return_field
                )
            }
        };

        let transformed_list = match list_array.data_type() {
            DataType::List(_) => {
                let list = list_array.as_list();

                Arc::new(ListArray::new(
                    field,
                    list.offsets().clone(),
                    transformed_values,
                    list.nulls().cloned(),
                )) as ArrayRef
            }
            DataType::LargeList(_) => {
                let large_list = list_array.as_list();

                Arc::new(LargeListArray::new(
                    field,
                    large_list.offsets().clone(),
                    transformed_values,
                    large_list.nulls().cloned(),
                ))
            }
            DataType::FixedSizeList(_, value_length) => {
                Arc::new(FixedSizeListArray::new(
                    field,
                    *value_length,
                    transformed_values,
                    list_array.as_fixed_size_list().nulls().cloned(),
                ))
            }
            other => exec_err!("expected list, got {other}")?,
        };

        Ok(ColumnarValue::Array(transformed_list))
    }
}
How relevant LambdaUDF methods would be called and what they would return during planning and evaluation of the example
// this is called at sql planning
let lambdas_parameters = lambda_udf.lambdas_parameters(&[
    ValueOrLambdaParameter::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)), // the Field of the [2, 3] literal
    ValueOrLambdaParameter::Lambda, // A unspecified lambda. On the example, v -> v != t.n
])?;

assert_eq!(
    lambdas_parameters,
    vec![
            // the [2, 3] argument, not a lambda so no parameters
            None,
            // the parameters that *can* be declared on the lambda, and not only 
            // those actually declared: the implementation doesn't need to care 
            // about it
            Some(vec![
                Field::new("", DataType::Int32, false), // the list inner value
                Field::new("", DataType::Int32, false), // the 1-based index of the element being transformed
            ])]
);



// this is called every time ExprSchemable is called on a LambdaFunction
let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs {
    arg_fields: &[
        ValueOrLambdaField::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)),
        ValueOrLambdaField::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != t.n" when "v" is of the type returned in lambdas_parameters
    ],
    scalar_arguments // irrelevant
})?;

assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false));



let value = array_transform.evaluate(&LambdaFunctionArgs {
    args: vec![
        ValueOrLambda::Value(List([2, 3])),
        ValueOrLambda::Lambda(LambdaFunctionLambdaArg {
            params: vec![Field::new("v", DataType::Int32, false)],
            body: PhysicalExpr("v != t.n"),// the physical expression of the lambda *body*, and not the lambda itself: this is not a LambdaExpr. 
            captures: Some(record_batch!("t.n", Int32, [2]))
        }),
    ],
    arg_fields, // same as above
    number_rows: 1,
    return_field, // same as above
    config_options, // irrelevant
})?;

assert_eq!(value, BooleanArray::from([false, true]))


A pair LambdaUDF/LambdaUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045

LambdaFunction invocation:

Instead of evaluating all it's arguments as ScalarFunction, LambdaFunction does the following:

  1. If it's a non lambda argument, evaluate as usual, and provide the resulting ColumnarValue to LambdaUDF::evaluate as a ValueOrLambda::Value
  2. If it's a lambda, construct a LambdaFunctionLambdaArg containing the lambda body physical expression and a record batch containing any captured columns as a ValueOrLambda::Lambda and provide it to LambdaUDF::evaluate. To avoid costly copies of uncaptured columns, we swap them with a NullArray while keeping the number of columns on the batch the same so captured columns indices are kept stable across the whole tree. The recent Project record batches to avoid filtering unused columns in CASE evaluation #18329 instead projects-out uncaptured columns and rewrites the expr adjusting columns indexes. If that is preferrable we can generalize that implementation and use it here too.
LambdaFunction evalution
impl PhysicalExpr for LambdaFunctionExpr {
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
        let args = self.args
            .map(|arg| {
                match arg.as_any().downcast_ref::<LambdaExpr>() {
                    Some(lambda) => {
                        // helper method that returns the indices of the captured columns. In the example, the only column available (index 0) is captured, so this would be HashSet(0)
                        let captures = lambda.captures();

                        let captures = if !captures.is_empty() {
                            let (fields, columns): (Vec<_>, _) = std::iter::zip(
                                batch.schema_ref().fields(),
                                batch.columns(),
                            )
                            .enumerate()
                            .map(|(column_index, (field, column))| {
                                if captures.contains(&column_index) {
                                    (Arc::clone(field), Arc::clone(column))
                                } else {
                                    (
                                        Arc::new(Field::new(
                                            field.name(),
                                            DataType::Null,
                                            false,
                                        )),
                                        Arc::new(NullArray::new(column.len())) as _,
                                    )
                                }
                            })
                            .unzip();

                            let schema = Arc::new(Schema::new(fields));

                            Some(RecordBatch::try_new(schema, columns)?)
                        } else {
                            None
                        };

                        Ok(ValueOrLambda::Lambda(LambdaFunctionLambdaArg {
                            params, // irrelevant,
                            body: Arc::clone(lambda.body()), // use the lambda body and not the lambda itself
                            captures,
                        }))
                    }
                    None => Ok(ValueOrLambda::Value(arg.evaluate(batch)?)),
                }
            })
            .collect::<Result<Vec<_>>>()?;

        // evaluate the function
        let output = self.fun.invoke_with_args(LambdaFunctionArgs {
            args,
            arg_fields, // irrelevant
            number_rows: batch.num_rows(),
            return_field: Arc::clone(&self.return_field),
            config_options: Arc::clone(&self.config_options),
        })?;

        Ok(output)
    }
}

Why LambdaVariable and not Column:

Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".

Example of code of another traversal that would break:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // if this is a lambda column, this function will break
            used_columns.insert(col.index());
        }
        Ok(TreeNodeRecursion::Continue)
    });
    ...
}

Furthermore, the implemention of ExprSchemable and PhysicalExpr::return_field for Column expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.

By including a FieldRef on LambdaVariable that should be resolved either during construction time, as in the sql planner, or later by the an AnalyzerRule, ExprSchemable and PhysicalExpr::return_field simply return it's own Field:

LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
impl ExprSchemable for Expr {
   fn to_field(
        &self,
        schema: &dyn ExprSchema,
    ) -> Result<(Option<TableReference>, Arc<Field>)> {
        let (relation, schema_name) = self.qualified_name();
        let field = match self {
           Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field.ok_or_else(|| plan_err!("Unresolved LambdaVariable {}", l.name)))),
           ...
        }?;

        Ok((
            relation,
            Arc::new(field.as_ref().clone().with_name(schema_name)),
        ))
    }
    ...
}

impl PhysicalExpr for LambdaVariable {
    fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
        Ok(Arc::clone(&self.field))
    }
    ...
}

For reference, Spark and Substrait also use a specialized node instead of a regular column

There's also discussions on making every expr own it's type: #18845, #12604

Possible fixes discarded due to complexity, requiring downstream changes and implementation size:
  1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR.
impl Expr {
    pub fn transform_with_lambdas_params<
        F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}

How minimize_join_filter would looks like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_lambdas_params(|expr, lambdas_params| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !lambdas_params.contains(col.name()) {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream.
//logical
struct Column {
    pub relation: Option<TableReference>,
    pub name: String,
    pub spans: Spans,
    pub is_lambda_parameter: bool,
}

//physical
struct Column {
    name: String,
    index: usize,
    is_lambda_parameter: bool,
}

How minimize_join_filter would look like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !col.is_lambda_parameter {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed:
impl Expr {
    pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> { ... }
    ... other methods
}

For any given LambdaFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from LambdaUDF::lambdas_parameters
How it would look like:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....
  1. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned
impl LogicalPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

trait ExecutionPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

//usage
impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

LambdaVariable evaluation, current implementation:

The physical LambdaVariable contains an optional ColumnarValue that must be binded for each batch before evaluation with the helper function bind_lambda_variables, which rewrites the whole lambda body, binding any variable of the tree.

LambdaVariable::evaluate
impl PhysicalExpr for LambdaVariable {
    fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
        self.value.clone().ok_or_else(|| exec_datafusion_err!("Physical LambdaVariable {} unbinded value", self.name))
    }
}

Unbinded:

LambdaExpression
    parameters: ["v"]
    body:
    ComparisonExpression(!=)
        left:
            LambdaVariableExpression("v", Field::new("", Int32, false), None)
        right:
            ColumnExpression("n")

After binding:

LambdaExpression
    parameters: ["v"]
    body:
    ComparisonExpression(!=)
        left:
            LambdaVariableExpression("v", Field::new("", Int32, false), Some([2, 3]))
        right:
            ColumnExpression("n")

Alternative:

Make the LambdaVariable evaluate it's value from the batch passed to PhysicalExpr::evaluate as a regular column. For that, instead of binding the body, the LambdaUDF implementation would merge the captured batch of a lambda with the values of it's parameters. So that it happen via an index as a regular column, the schema used plan to physical LambdaVariable must contain the lambda parameters. This would be the only place during planning that a schema would contain those parameters. Otherwise it only can get the value from the batch via name instead of index

  1. Add a index to LambdaVariable, similar to Column, and remove the optional value.
struct LambdaVariable {
    name: String, // "v", of the lambda body: v != t.n
    field: FieldRef, // Field::new("", DataType::Int32, false)
    index: usize, // 1
}
  1. Insert the lambda parameters only at the Schema used to do the physical planning, to compute the index of a LambdaVariable
how physical planning would look like
fn create_physical_expr(
    e: &Expr,
    input_dfschema: &DFSchema,
    execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
    let input_schema = input_dfschema.as_arrow();

    match e {
        ...
        Expr::LambdaFunction(LambdaFunction { func, args}) => {
            let args_metadata = args.iter()
                .map(|arg| if arg.is::<LambdaExpr>() {
                    Ok(ValueOrLambdaParameter::Lambda)
                } else {
                    Ok(ValueOrLambdaParameter::Value(arg.to_field(input_dfschema)?))
                })
                .collect()?;

            let lambdas_parameters = func.lambdas_parameters(&args_metadata)?;

            let physical_args = std::iter::zip(args, lambdas_parameters)
                .map(|(arg, lambda_parameters)| {
                    match (arg.downcast_ref::<LambdaExpr>(), lambda_parameters) {
                        (Some(lambda), Some(lambda_parameters)) => {
                            let extended_dfschema = merge_schema_and_parameters(input_dfschame, lambda_parameters)?;

                            create_physical_expr(body, extended_dfschema, execution_props)
                        }
                        (None, None) => create_physical_expr(arg, input_dfschema, execution_props),
                        (Some(_), None) => plan_err!("lambdas_parameters returned None for a lambda")
                        (None, Some(_)) => plan_err!("lambdas_parameters returned Some for a non lambda")
                    }
                })
                .collect()?;

            Ok(Arc::new(LambdaFunctionExpr::try_new(
                Arc::clone(func),
                physical_args,
                input_schema,
                config_options: ... // irrelevant
            )?))
        }
    }
}

  1. Insert the lambda parameters values into the RecordBatch during the evaluation phase: the LambdaUDF, instead of binding the lambda body variables, inserts it's parameters on the captured RecordBatch it receives on LambdaFunctionLambdaArg.

How ArrayTransform::invoke_with_args would look like:

        ...
        let values_param = || Ok(Arc::clone(list_values));
        let indices_param = || elements_indices(&list_array);

        let merged_batch = merge_captures_with_params(
            adjusted_captures,
            &lambda.params,
            &[&values_param, &indices_param],
        )?;

        // call the transforming expression with the record batch
        let transformed_values = lambda.body
            .evaluate(&merged_batch)?
            .into_array(list_values.len())?;
        
        ...

Why is LambdaVariable Field is an Option?

So expr_api users can construct a LambdaVariable just by using it's name, without having to set it's field. An AnalyzerRule will then set the LambdaVariable field based on the returned values from LambdaUDF::lambdas_parameters of any LambdaFunction it finds while traversing down a expr tree. We may include that rule on the default rules list for when the plan/expression tree is transformed by another rule in a way that changes the types of non lambda arguments of a lambda function, as it may change the types of it's lambda parameters, which would render LambdaVariable field's out of sync, as the rule would fix it. Or to not increase planning time we don't include it by default and instruct expr_api users to add it manually if needed

array_transform(
    col("my_array"),
    lambda(
        vec!["current_value"],
        2 * lambda_variable("current_value")
    )
)

//instead of 

array_transform(
    col("my_array"),
    lambda(
        vec!["current_value"],
        2 * lambda_variable("current_value", Field::new("", DataType::Int32, false))
    )
)

Why set LambdaVariable field during sql planning if it's optional and can be set later via an AnalyzerRule?

Some parts of sql planning checks the type/nullability of the already planned children expression of the expr it's planning, and would error if doing so on a unresolved LambdaVariable
Take as example this expression: array_transform([[0, 1]], v -> v[1]). FieldAccess v[1] planning is handled by the ExprPlanner FieldAccessPlanner, which checks the datatype of v, a lambda variable, which ExprSchemable implementation depends on it's field being resolved, and not on the PlannerContext schema, requiring sql planner to plan LambdaVariables with a resolved field

FieldAccessPlanner
pub struct FieldAccessPlanner;

impl ExprPlanner for FieldAccessPlanner {
    fn plan_field_access(
        &self,
        expr: RawFieldAccessExpr, // "v[1]"
        schema: &DFSchema,
    ) -> Result<PlannerResult<RawFieldAccessExpr>> {
        // { "v", "[1]" }
        let RawFieldAccessExpr { expr, field_access } = expr;

        match field_access {
            ...
            // expr[idx] ==> array_element(expr, idx)
            GetFieldAccess::ListIndex { key: index } => {
                match expr {
                    ...
                    // ExprSchemable::get_type called
                    _ if matches!(expr.get_type(schema)?, DataType::Map(_, _)) => {
                        Ok(PlannerResult::Planned(Expr::ScalarFunction(
                            ScalarFunction::new_udf(
                                get_field_inner(),
                                vec![expr, *index],
                            ),
                        )))
                    }
                }
            }
        }
    }
}

Therefore we can't plan all arguments on a single pass, and must first plan the non-lambda arguments, collect their types and nullability, pass them to LambdaUDF::lambdas_parameters, which will derive the type of it's lambda parameters based on the type of it's non-lambda argument, and return it to the planner, which, for each unplanned lambda argument, will create a new PlannerContext via with_lambda_parameters, which contains a mapping of lambdas parameters names to it's type. Then, when planning a ast::Identifier, it first check whether a lambda parameter with the given name exists, and if so, plans it into a Expr::LambdaVariable with a resolved field, otherwise plan it into a regular Expr::Column.

sql planning
struct PlannerContext {
    /// The parameters of all lambdas seen so far
    lambdas_parameters: HashMap<String, FieldRef>,
    // ... omitted fields
}

impl PlannerContext {
    pub fn with_lambda_parameters(
        mut self,
        arguments: impl IntoIterator<Item = FieldRef>,
    ) -> Self {
        self.lambdas_parameters
            .extend(arguments.into_iter().map(|f| (f.name().clone(), f)));

        self
    }
}

// copied from sqlparser
struct LambdaFunction {
    pub params: OneOrManyWithParens<Ident>, // One("v")
    pub body: Box<Expr>, // v != t.n
}

// copied from sqlparser
enum OneOrManyWithParens<T> {
    One(T), // "v"
    Many(Vec<T>),
}

/// the planning would happens as the following:

enum ExprOrLambda {
    Expr(Expr), // planned [2, 3]
    Lambda(ast::LambdaFunction), // unplanned v -> v != t.n
}

impl SqlToRel {
    // example function, won't exist
    fn plan_array_transform(&self, array_transform: Arc<dyn LambdaUDF>, args: Vec<ast::Expr>, schema: &DFSchema, planner_context: &mut PlannerContext) -> Result<Expr> {
        let args = args.into_iter()
            .map(|arg| match arg {
                ast::Expr::LambdaFunction(l) => Ok(ExprOrLambda::Lambda(l)),//skip planning until we plan non lambda args
                arg => Ok(ExprOrLambda::Expr(
                    self.sql_fn_arg_to_logical_expr_with_name(
                        arg,
                        schema,
                        planner_context,
                    )?,
                ))
            })
            .collect::<Result<Vec<_>>>()?;

        let args_metadata = args.iter()
            .map(|arg| match arg {
                Expr(expr) => Ok(ValueOrLambda::Value(expr.to_field(schema)?)),
                Lambda(_) => Ok(ValueOrLambda::Lambda),
            })
            .collect::<Result<Vec<_>>>()?;
        
        let lambdas_parameters = array_transform.lambdas_parameters(&args_metadata)?;

        let args = std::iter::zip(args, lambdas_parameters)
            .map(|(arg, lambdas_parameters)| match (arg, lambdas_parameters) {
                (ExprOrLambda::Expr(planned_expr), None) => Ok(planned_expr),
                (ExprOrLambda::Lambda(unplanned_lambda), Some(lambda_parameters)) => {
                    let params =
                        unplanned_lambda.params
                            .iter()
                            .map(|p| p.value.clone())
                            .collect();

                    let lambda_parameters = lambda_params
                        .into_iter()
                        .zip(&params)
                        .map(|(field, name)| Arc::new(field.with_name(name)));

                    let mut planner_context = planner_context
                        .clone()
                        .with_lambda_parameters(lambda_parameters);

                    Ok((
                        Expr::Lambda(Lambda {
                            params,
                            body: Box::new(self.sql_expr_to_logical_expr(
                                *lambda.body,
                                schema,
                                &mut planner_context,
                            )?),
                        }),
                        None,
                    ))
                }
                (ExprOrLambda::Expr(planned_expr), Some(lambda_parameters)) => plan_err!("lambdas_parameters returned Some for a value"),
                (ExprOrLambda::Lambda(unplanned_lambda), None) => plan_err!("lambdas_parameters returned None for a lambda"),
            })
            .collect::<Result<Vec<_>>>()?;

        Ok(Expr::LambdaFunction(LambdaFunction {
            func: array_transform,
            args,
        }))    
    }

    fn sql_identifier_to_expr(
        &self,
        id: ast::Ident,
        schema: &DFSchema,
        planner_context: &mut PlannerContext,
    ) -> Result<Expr> {
        // simplified implementation
        if let Some(field) = planner_context.lambdas_parameters.get(id) {
            Ok(Expr::LambdaVariable(LambdaVariable {
                name: id, // "v"
                field, // Field::new("", DataType::Int32, false)
            }))
        } else {
            Ok(Expr::Column(Column::new(id)))
        }
    }
}

LambdaFunction Signature is non functional

Currenty, LambdaUDF::signature returns the same Signature as ScalarUDF, but it's type_signature field is never used, as most variants of the TypeSignature enum aren't applicable to a lambda, and no type coercion is applied on it's arguments, being currently a implementation responsability. We should either add lambda compatible variants to the TypeSignature enum, create a new LambdaTypeSignature and LambdaSignature, or support no automatic type coercion at all on lambda functions.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate common Related to common crate proto Related to proto crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate spark labels Nov 25, 2025
@gstvg
Copy link
Contributor Author

gstvg commented Nov 25, 2025

Outdated # Traversing Expr trees with a schema that include lambdas parameters

The parameters of a lambda aren't present in the schema of the plan they belong to. During tree traversals that use a schema to check expressions datatype, nullability and metadata, there must be a way to access a schema which includes those parameters.

Expr tree traversal with wrong schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b = List(List(Int32))                    ╷        
╷                         c = Int32                                ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b = List(List(Int32))                   ╷        
╷                          c = Int32                               ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1. Per-lambda schema with a new set of TreeNode methods: *_with_schema

Once again, this PR adds another set of TreeNode-like methods on logical and physical expressions, that while traversing expression trees, when they find a ScalarUDF that contains a lambda on its arguments, uses ScalarUDF::lambdas_parameters to create a schema adjusted for each of its arguments, and pass it as an argument to the visiting/transforming function.

impl Expr {
pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> {}
}

Example usage:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....

In order to add the lambda parameters to schema, we need to take into account DFSchema properties:

"Unqualified fields must be unique not only amongst themselves, but also must have a distinct name from any qualified field names"

Since lambdas parameters are always unqualified, they may conflict with columns of the outer schema, even though those being qualified. To fix this conflict, we can either:

1: Replace the existing column with the lambda parameter, in the same index of the vec of fields of the schema, in order to not change the index of columns to the right of it. That's the current approach in this PR

Expr tree traversal with adjusted schema, replacing conflicts
 +------------------------------------------------------------------+  
 | array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(List(Int32))                    |  
 |                         c = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |             (b, i) -> array_transform(b, b -> b + c + i)         |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)  ! replaced !            |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |                 array_transform(b, b -> b + c + i)               |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)   ! replaced !           |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+ 
 |                          b -> b + c + i                          | 
 |                                                                  | 
 |                           a = Int32                              | 
 |                           b = Int32     ! replaced !             | 
 |                           c = Int32                              | 
 |                           i = Int32                              | 
 +------------------------------------------------------------------+ 
                                                                      


2: Rename the shadowed column to an unique, non-conflicting name and add the lambda parameter to the end of the vec of fields of the schema. This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. It's trivial to change the PR to use this.

Expr tree traversal with adjusted schema, renaming conflicts
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b_shadowed1 = List(List(Int32))          ╷        
╷                         c = Int32                                ╷        
╷                         b = List(Int32)                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b_shadowed1 = List(List(Int32))           ╷        
╷                        c = Int32                                 ╷        
╷                        b = List(Int32)                           ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b_shadowed1 = List(List(Int32))         ╷        
╷                          c = Int32                               ╷        
╷                          b_shadowed2 = List(Int32)               ╷        
╷                          b = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        



Lambdas usually are evaluated with a different number of rows than that of the outer scope, as in the example, where array_transform is executed with one row, and its lambda with two rows, one for each element of the array. The UDF implementation is responsible for adjusting the captured columns with the number of rows of its parameters with whatever logic makes sense to it. For array_transform, its to copy the value of the captured column for each element of the arrays:
        copied once  a [1]------------------> a 1  
                                                   
     copied 2 times  b [2, 3] --------------> b 2  
                               \                   
         not copied  c []       ------------> b 3     
                                                

This adjustment is costly, so it's necessary to provide a way to the implementation to avoid adjusting uncaptured columns.

It's intuitive to just remove the uncaptured columns, but note in the diagram and in the query below that it can change the index of captured columns. The "c" column has index 2 in the outer scope but ends up with index 1 in the others scopes

Expr tree traversal with a schema with uncaptured columns removed
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a@0 = Int32                               ╷        
╷                        b@1 = List(List(Int32))                   ╷        
╷                        c@2 = Int32                               ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          b@0 = Int32                             ╷        
╷                          c@1 = Int32                             ╷        
╷                          i@2 = Int32                             ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) -> array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;

Option 1a: Nullify uncaptured columns

To keep the indices stable, this PR won't remove uncaptured columns, as such, they are still present in the adjusted schema with their original type during tree traversals with the new _with_schema methods. However, to avoid the costly adjustment, when they are passed to the UDF in invoke_with_args, they are substituted with columns with the Null datatype.

Expr execution/evaluation RecordBatch schema with uncaptured columns substituted with Null columns
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Null  ! nullified !                  ╷        
╷                         b = List(Int32)                          ╷        
╷                         c = Int32                                ╷        
╷                         i = Int32                                ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Null  ! nullified !                   ╷        
╷                        b = List(Int32)                           ╷        
╷                        c = Int32                                 ╷        
╷                        i = Int32                                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          a = Null  ! nullified !                 ╷        
╷                          b = Int32                               ╷        
╷                          c = Int32                               ╷        
╷                          i = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1b TreeNode *_with_indices_mapping

To avoid keeping uncaptured columns in the schema and substituting them with null in the batch, is possible to add another set of TreeNode-like methods on physical expressions that calls the visiting/transforming function with a second parameter of type HashMap<usize, usize> mapping the indices of the current scope with the ones from the outermost scope. This requires that before calling the visiting/transforming function for a physical lambda expression, all its subtree be visited to collect all the captured columns to build the indices mapping. Inner lambdas require the process again and can't reuse the work of the outer lambda. This may be costly for lambdas with complex expressions and/or highly nested.

impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
    pub fn transform_with_indices_mapping<
        F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}
Expr tree traversal with indices_mapping: "c" has index 2 in the root scope but 1 in the others
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        indices_mapping = {}                      ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                     indices_mapping = { 1 => 2 }                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

The code on minimize_join_filter would change to:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_indices_mapping(|expr, indices_mapping| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // this column may be child of a lambda, where this indice would refer to the lambda
            // scoped schema, which won't include uncaptured columns from the plan input,
            // and therefore may differ from the indices of the schema of the input plan
            // In such cases, indices_mapping contain the mapping to the index of the input plan
            // if a mapping is not found, it should be a column referring to a lambda parameter
            let scoped_index = col.index();
            if let Some(plan_index) = indices_mapping.get(scoped_index) {
                used_columns.insert(plan_index);
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

Option 2. Create a schema with all parameters from all lambdas for tree traversals

Use a secondary schema containing all parameters from all lambdas. For that, expressions must be transformed, normalizing all lambda parameters and its references, with a unique qualifier per lambda, so they can coexist without conflicts in this schema. A qualifier field would be added to the lambda expr

pub struct Lambda {
    pub qualifier: Option<String>,
    pub params: Vec<String>,
    pub body: Box<Expr>,
}

Schema of the example:

t.a: Int32
t.b: List(List(Int32))
lambda1.b: List(Int32)
lambda1.i: UInt32
lambda2.b: Int32

From my understanding of this video, this is similar to what DuckDB does on its binder, although with differences in the evaluation part. I didn't find any other resource for other open source engines with lambda support, like Clickhouse and Spark.

This works well when dealing with plans nodes, where, during plan creation time or schema recomputation, we can normalize its lambdas, create the extended schema and save it as plan field, exposing it with a method like "lambda_extended_schema", although with an added cost to plan creation/schema recomputation. The lambda normalization actually requires two passes, a first to collect any existing lambda qualifier to avoid reusing them in the last, normalizing pass.

How code would look like:

//from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)

Another example:

impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

However, when working with functions/methods that deal directly with expressions, detached from a plan, the expression lambdas may be unnormalized, and the extended schema is unavailable. There's a few public methods/functions like that, like infer_placeholder_types for example:

impl Expr {
    pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        self.transform(|mut expr| ...)
        ...
    }
}   

It could either:

1: Require to be only called with normalized expressions, and that the schema argument be the extended schema, or return an error otherwise, which is restrictive and put strain on users

2: Allow to be called with unnormalized expressions, visit the whole expr tree collecting the existing lambdas qualifiers to avoid to avoid duplicate qualifiers in the next step, perform a first transformation to guarantee that the expression lambdas are normalized, create the extended schema, for only then perform the second transformation to infer the placeholder types using the extended schema. While it can document that the returned expression is normalized, it's still a regular Expr which doesn't encode that property in its type. Also, without changing the method signature, it wouldn't even be possible to return the extended schema to allow it to be used again in other places without recomputation. This is costly and won't allow reuse of its costly work



Normalized example:

select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) -> array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;

Just like with the first option, this also sets uncaptured columns to Null, as well as unavailable/out-of-scope lambdas parameters.

Expr tree batch evaluation with a single extended schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        t.a = Int32                               ╷        
╷                        t.b = List(List(Int32))                   ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = Null                          ╷        
╷                        lambda1.i = Null                          ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          t.a = Null                              ╷        
╷                          t.b = Null                              ╷        
╷                          t.c = Int32                             ╷        
╷                          lambda1.b = Null                        ╷        
╷                          lambda1.i = Int32                       ╷        
╷                          lambda2.b = Int32                       ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

With this option, indices are always stable across the whole tree

This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods.

Comparison between options:

* per-lambda schema with uncaptured columns set to null per-lambda schema with indices_mapping single extended schema
New set of TreeNode methods Yes, 1, _with_schema for both logical and physical expressions Yes, 2, _with_schema for both logical and physical expressions, and _with_indices_mapping for physical expressions No
Tree traversal added cost Only when encountering a lambda Only when encountering a lambda Zero
Plan creation/ recompute schema added cost Zero Zero Always, regardless of existence of any lambda
Code change, internal New set of TreeNode methods and using them instead of the current ones when applicable 2 new set of TreeNode methods and using them instead of the current ones when applicable Untried, unpredictable
Code change, downstream if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none Variable, medium when closely associated with a Plan, just call plan. lambda_extended_schema() Unpredictable when plan is unavailable or doesn't exist
Change uncaptured columns DataType to Null Yes No Yes
Presence of unneeded Null columns in the schema during planning and optimizing and in the RecordBatch during execution as a padding/filler to keep indices stable Yes No Yes
Stable column indices across the whole expr tree Yes No Yes
Make _with_lambdas_params unnecessary for physical expressions if Expr::Column is used No Yes No

Splitting this into smaller PRs

If this PR is decided to move forward, it will likely be with smaller PRs. In that case, I already planned a division shown below. It's necessary to analyze the graph, as it doesn't help with the discussion of this text, and it's included here just to show a reasonable granularity of smaller PRs I could find in case it helps decide whether to move this forward or not.

Each rectangular node is a PR. Asymmetrical nodes are a collection of smaller PRs which share the same dependencies and aren't a dependency of any other PR. Green ones can be opened immediately. Gray ones contain unmet dependencies. Merged PRs will be colored with blue.

Left-to-Right full-screen link

Details
graph LR
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end


Loading


Top-to-Bottom full-screen link

Details
graph TB
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Right-to-Left full-screen link

Details
graph RL
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Bottom-to-Top full-screen link

Details
graph BT
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading

@gstvg gstvg changed the title [DRAFT] Add lambda support and array_transform udf [RFC] Add lambda support and array_transform udf Dec 9, 2025
@fbx31
Copy link

fbx31 commented Dec 11, 2025

Thanks a lot @gstvg, huge amount of work apparently (even if I am not enough skilled to judge). I am waiting for such functions since a very long time.. IMHO, this is a MUST HAVE in datafusion. All serious analytics engine provide this kind of functions, DuckDB implementation is very mature, Polars seems to have it as well even if I didn"t tried it yet and I really hope that datafusion will jump into this enhancement very quickly as it is again a big miss.

Use cases like list_filter(list, expr), list_map(list, expr) and list_reduce(list, base, expr) will be available soon.

Again, thanks a lot for your work and I cross the fingers for a quick progress.

@shehabgamin
Copy link
Contributor

@gstvg This is super exciting! I'll review over the next couple of days.

cc @SparkApplicationMaster @andygrove would be great to get your thoughts too 😃

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @fbx31. This is high priority for me right now. I also hope we can finish this quickly. It's indeed a lot work, but now I believe it ended up that way because I went to fast in the wrong direction... More on the comment below. Thanks again!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @shehabgamin. I analyzed the Spark implementation one last time, and realized I could have done similarly since the beginning... I've already tested it locally and pretend to push soon, the diff reduced to ~2K LOC, mostly in new, self contained code with small changes in existing code and without requiring changes in downstream code. I hope you haven't put time reviewing it yet because I believe the spark approach is much better and also easier/faster to review

Basically, for planning, we lazily set the Field in the lambda column itself, instead of injecting it into a DFSchema

//logical
struct LambdaColumn {
    name: String,
    field: Option<FieldRef>,
}

And for evaluation, again, we lazily set the value of a lambda column instead of injecting it into a RecordBatch

//physical
struct LambdaColumn {
    name: String,
    field: FieldRef,
    value: Option<ColumnarValue>,
}

I think I initially ruled that out because other expressions doesn't contain it's own Field, and this have been discussed before in #12604 *, and didn't went forward, which the difference that the other expressions Field's naturally exist in the plan schema, which now I believe justifies this difference.
And most importantly, because I had no idea of how much work and downstream churn injecting the lambdas parameters into the Schema/RecordBatch would cause 🤦

I will push soon and update the PR description, thanks again!

*This is being discussed again in #18845

@github-actions github-actions bot removed core Core DataFusion crate datasource Changes to the datasource crate labels Dec 15, 2025
@linhr
Copy link
Contributor

linhr commented Dec 18, 2025

This is an exciting initiative!

I skimmed through the PR, and I have some thoughts regarding how this can fit into DataFusion's existing setup. I wonder if we can have Expr::LambdaFunction(LambdaFunction), similar to Expr::ScalarFunction(ScalarFunction)? Here are my reasoning:

  1. LambdaFunction can represent a "resolved" lambda function call in the logical plan. In contrast, Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).
  2. I feel ScalarUDF may not fit cleanly for lambda functions, so we can then have a separate abstraction (e.g. LambdaUDF) inside Expr::LambdaFunction. The ArrayTransform example shows that we would have to use ScalarFunctionArgs which was not originally designed for the lambda use case. (IMO when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list.)
  3. Once we have a self-contained logical expression, the entire query analysis flow might become easier to reason about (from SqlToRel to Expr::LambdaFunction to a dedicated PhysicalExpr and the corresponding physical planner).

This is just my rough thoughts. Happy to discuss!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 19, 2025

Thanks @linhr
I think a new LambdaUDF is reasonable, and I'm not strongly inclined towards using ScalarUDF either

Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).

Yeah, this is the major challenge in this PR. Currently, Expr::Lambda always return DataType::Null and Expr::LambdaVariable embeds a FieldRef which is used to implement the ExprSchemable and don't even look at the DFSchema

when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list.

Could you expand this further? It is like LambdaUDF having a resolve_lambdas method which result is passed to the invoke_with_args method?

@linhr
Copy link
Contributor

linhr commented Dec 21, 2025

Could you expand this further? It is like LambdaUDF having a resolve_lambdas method which result is passed to the invoke_with_args method?

Suppose we have array_transform([1, 2], v -> v*2). We could have a trait LambdaUDF and have impl LambdaUDF for ArrayTransform. (Or we follow the existing convention to have struct LambdaUDF and trait LambdaUDFImpl separately.) A logical representation of v -> v*2 is passed to ArrayTransform::new(). For Expr::LambdaFunction(LambdaFunction), we can have LambdaFunction { func: Arc<dyn LambdaUDF>, args: Vec<Expr> } where the non-lambda parameter [1, 2] is stored in args.

During physical planning, we could resolve ArrayTransform into PhysicalArrayTransform which stores v -> v*2 resolved as certain PhysicalExpr. We have a trait PhysicalLambdaUDF and impl PhysicalLambdaUDF for PhysicalArrayTransform. The trait method PhysicalLambdaUDF::invoke_with_args accepts the Arrow array [1, 2] and compute the results. I'd imagine this invocation can be done in a general (physical) LambdaFunctionExpr that works for all lambda functions, similar to ScalarFunctionExpr.

When I worked with ScalarUDF, I notice that the logic required for logical representation, physical planning, and the actual execution are all within a single ScalarUDFImpl trait. If we look at how these trait methods are used by various planning/execution stages, we might get the big picture how a parallel code structure (with multiple traits to separate the responsibilities) can be designed for lambda functions.

I haven't thought about function registry, documentation etc. which we can get for free in the existing ScalarUDF setup. So some more investigation is needed to estimate the amount of work if we explore the route I described above.

@gstvg
Copy link
Contributor Author

gstvg commented Dec 23, 2025

@linhr Thanks, I got it now. I think it's possible to resolve lambda parameters and remove them from the arguments list with ScalarUDF itself, with a few cons compared to a dedicated to LambdaUDF, but with much less code changes and similar enough to compare the alternatives. I believe I'm already finishing it, and will push to another branch soon.

@keen85
Copy link

keen85 commented Jan 25, 2026

@gstvg any updates on this one? 😇

@gstvg
Copy link
Contributor Author

gstvg commented Feb 9, 2026

@linhr
Really really sorry for the delay


I think it's possible to resolve lambda parameters and remove them from the arguments list with ScalarUDF itself, with a few cons compared to a dedicated to LambdaUDF, but with much less code changes and similar enough to compare the alternatives. I believe I'm already finishing it, and will push to another branch soon.

Unfortunately, trying to resolve lambdas with ScalarUDF didn't work well.
So, before moving to a LambdaUDF based implementation, I would like to further discuss the current approach.


Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).

Yeah, this is the major challenge in this PR. Currently, Expr::Lambda always return DataType::Null and Expr::LambdaVariable embeds a FieldRef which is used to implement the ExprSchemable and don't even look at the DFSchema

I mean, this was the major challenge of the PR on it's first implementation, now that we use LambdaVariable with a Field, I consider this to be solved. And currently Expr::Lambda actually returns the return_field of it's body, but I believe we can also return a field with DataType::Null if deemed better.


LambdaFunction can represent a "resolved" lambda function call in the logical plan. In contrast, Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).
... when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list

Sorry, at first I thought this only means resolving/embedding the lambda body within the UDF itself and removing it from Expr::ScalarFunction.args/LambdaFunction.args, and also removing Expr::Lambda variant from the logical Expr enum and it's physical counterpart, but now I'm not sure it also means omitting the lambda body from TreeNode traversals?

To streamline the discussion, I have a few comments for each of these options, if they're applicable:

Omitting the lambda body from TreeNode traversals

At least the following traversals currently use TreeNode and would require adjust if the body is omitted:

Projection pushdown for column capture, type coercion for correctness, expr simplifier and CSE for performance.

Being only 4 internal traversals, is easy to specially handle them, but since DF is an open system, I believe there should be a way to downstream users to visit/transform lambdas expressions. We could document that they need to be specially handled, outside TreeNode, or we could offer an API for it, and after all, I believe the ideal API would be exactly the TreeNode API.

In my first iteration of this PR, the "outdated" sections of the description and of the first comment, I manually checked every TreeNode usage on logical and physical expressions, and find none where Expr::Lambda should be specially handled(it is simply ignored), and Expr::LambdaVariable only required handling in few places.

Removing Expr::Lambda variant from Expr enum and it's physical counterpart

Removing Expr::Lambda from AST makes some tree traversals more difficult, while keeping it doesn't make difference: it's simply ignored as most expressions in most traversals.

One example is BindLambdaVariable in this PR. Is not important to understand what it does, just that it's bigger, with more boilerplate and harder to read without Expr::Lambda:

struct BindLambdaVariable<'a> {
    variables: HashMap<&'a str, (ArrayRef, usize)>, 
    //(variable value, number of times it has been shadowed by other lambdas variables in inner scopes in the current position of the tree)
}

impl TreeNodeRewriter for BindLambdaVariable<'_> {
    type Node = Arc<dyn PhysicalExpr>;

    fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
        if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
            if let Some((value, shadows)) = self.variables.get(lambda_variable.name()) {
                if *shadows == 0 {
                    return Ok(Transformed::yes(Arc::new(
                        lambda_variable.clone().with_value(value.clone()),
                    )));
                }
            }
        } else if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
            for param in inner_lambda.params() {
                if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
                    *shadows += 1;
                }
            }
        }

        Ok(Transformed::no(node))
    }

    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
        if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
            for param in inner_lambda.params() {
                if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
                    *shadows -= 1;
                }
            }
        }

        Ok(Transformed::no(node))
    }
}

Implementation without Physical Lambda on AST

fn bind_lambda_variables(
    node: Arc<dyn PhysicalExpr>,
    params: &mut HashMap<&str, (ArrayRef, usize)>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
    node.transform_down(|node| {
        if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
            if let Some((value, shadows)) = params.get(lambda_variable.name()) {
                if *shadows == 0 {
                    return Ok(Transformed::yes(Arc::new(
                        lambda_variable.clone().with_value(value.clone()),
                    )));
                }
            }
        } else if let Some(fun) = node.as_any().downcast_ref::<LambdaUDFExpr>() {
            let mut transformed = false;

            let new_lambdas = fun
                .lambdas()
                .iter()
                .map(|(params_names, body)| {
                    for param in *params_names {
                        if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
                            *shadows += 1;
                        }
                    }

                    let new_body = bind_lambda_variables(Arc::clone(body), params)?;
                    transformed |= new_body.transformed;

                    for param in *params_names {
                        if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
                            *shadows -= 1;
                        }
                    }

                    Ok((params_names.to_vec(), new_body.data))
                })
                .collect::<Result<_>>()?;

            let new_args = fun
                .args()
                .iter()
                .map(|arg| {
                    let new_arg = bind_lambda_variables(Arc::clone(arg), params)?;
                    transformed |= new_arg.transformed;
                    Ok(new_arg.data)
                })
                .collect::<Result<_>>()?;

            let fun = fun.with_new_children(new_args, new_lambdas);

            return Ok(Transformed::new(
                Arc::new(fun),
                transformed,
                TreeNodeRecursion::Stop,
            ));
        }

        Ok(Transformed::no(node))
    })
}

Note that this transformation always return a constant TreeNodeRecursion value. If not, TreeNodeRecursion would also needed to be manually handled, adding more boilerplate that TreeNode handles automatically when Expr::Lambda is present

Resolve/embed lambdas on the UDF implementation itself(essentially partition args from lambdas)

By partition args and lambdas, we lose positional data, which are necessary to implement SqlUnparser and logical and physical formatting, leading to either the implementation itself having to implement them, adding some boilerplate, where today is automatically handled by ScalarUDF and SqlUnparser itself, or that the implementation save and expose positional data, that is them used by ScalarUDF and SqlUnparser to de-partition the args list, and them proceed normally.

Some closed system without support for udf with lambdas use a common positioning for lambdas and can just use them without having to store positional data nor lambdas and other args in the same ordered list:

Always last for spark, duckdb and snowflake:

transform(array(1), v -> v*2) -- spark
list_transform([1, 2, NULL, 3], lambda x: x + 1)  -- duckdb
transform([1, 2, 3], a INT -> a * 2) -- snowflake

Always first for clickhouse:

 arrayMap(x -> (x + 2), [1, 2, 3])

But I think that datafusion as an open and extendable system shouldn't stick to a single convention as different deployments may want to support one, another or both conventions.

Furthermore, my interest in lambda functions is to implement union manipulating udfs, which receives multiple lambdas interleaved with regular arguments. Consider an Union<'str' = Utf8, 'num' = Float64>, that could be transformed like this:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', num -> num*2
)

where today the only way is with this(if/when support for union_value lands):

case union_tag(union_value)
    when 'str' then union_value('str', trim(union_extract(union_value, 'str')))
    when 'num' then union_value('num', union_extract(union_value, 'num') * 2)
end

For such cases, storing positional data or manually implementing logical and phyiscal formatting and SqlUnparser for every UDF is mandatory, as that positioning doesn't fit the simple conventions like always first or last.

This also omit positional info from the implementation. None lambda udf discussed so far, including union_transform, requires positional info for evaluation, only for logical and physical formatting and SqlUnparser, but I don't like the idea of blocking any future hypothetical udf that requires positional info for evaluation. A theoretical example is union_transform itself supporting a slight shorter syntax for lambdas that returns constants:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', _num -> 0 -- lambda that returns constant/scalar
)

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', 0 -- shorter syntax, pass constant directly
)

That syntax does requires positional info for evaluation. It's really just an example and I don't plan to support it on my union_transform implementation

Instead, if we add a new LambdaFunction expression and LambdaUDF trait, the implementation could own both the lambdas and the other args, so that no partitioning would occur:

struct LambdaFunction { invocation: Box<dyn LambdaInvocation> } //invocation includes all args, both lambdas and non-lambdas
Expr::LambdaFunction(LambdaFuntion::new(ArrayTransform::new(all_args_including_lambdas)))

// instead of 
struct LambdaFunction { func: Arc<dyn LambdaUDF>, args: Vec<Expr> } // func owns only lambdas, and non lambdas are stored on args
Expr::LambdaFunction(LambdaFunction::new(ArrayTransform::new(lambda), args_without_lambdas))

Insights from Spark implementation

I'm don't have much experience with Spark and Scala, I only implemented a custom data source a few years ago, so if something below seems wrong, it probably is.

Spark includes both HigherOrderFunction trait and LambdaFunction class, similar to our current Expr::Lambda (and not the proposed LambdaFunction), as well as NamedLambdaVariable class, similar to ours Expr::LambdaVariable, all of them extending Expression, which in turn extends TreeNode

Some traversals branch directly on LambdaFunction expressions, which in my view support the idea of both keeping Expr::Lambda and exposing the lambda body on the TreeNode API:

Lambda Binder, (similar to ours BindLambdaVariable)
ResolveLambdaVariables
ColumnResolutionHelper
ColumnNodeToExpressionConverter
CheckAnalysis
ColumnNodeToProtoConverter

Some on branch LambdaVariable, which in my view support the ideia exposing of the lambda body on the TreeNode API:

SessionCatalog
HigherOrderFunction.functionForEval
NormalizePlan
TableOutputResolver

Some branch on HigherOrderFunction:

CheckAnalysis
Analyzer
ResolveLambdaVariables

Is true that some traversals branch on HigherOrderFunction expressions, therefore supporting the ideia of a new Expr::LambdaFunction, but since checking if a Expr::ScalarFunction invocation contains lambdas is simple as the code below, and can be made even simpler by adding a helper method contains_lambdas(&self) -> bool to ScalarFunction, I don't think it alone justifies adding a new Expr::LambdaFunction holding a ScalarFunction. If LambdaUDF trait is added then a new Expr::LambdaFunction variant has to be added anyway so this become a non-issue.

match expr {
    Expr::ScalarFunction(fun) if fun.args.iter().any(|arg| matches!(arg, Expr::Lambda(_))) => ...,
    // with helper
    Expr::ScalarFunction(fun) if fun.contains_lambdas() => ...
    ...
}

impl ScalarFunction {
    fn contains_lambdas(&self) -> bool {
        self.args.iter().any(|arg| matches!(arg, Expr::Lambda(_)))
    }

    // Other helpers could be added to like:
    fn lambda_args(&self) -> impl Iterator<Item = &LambdaFunction> {
        self.args.iter().filter_map(|arg| match arg {
            Expr::Lambda(l) => Some(l),
            _ => None
        })
    }
    
    fn non_lambda_args(&self) -> impl Iterator<Item = &Expr> {
        self.args.iter().filter(|arg| !matches(arg, Expr::Lambda(_)))
    }
}

Higher order functions, like ArrayTransform, receive both the regular arg as well as the lambda as regular Expressions, the lambda being expected to be a LambdaFunction:

case class ArrayTransform(
    argument: Expression,
    function: Expression)
  extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {

  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)

  override protected def bindInternal(
      f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    val ArrayType(elementType, containsNull) = argument.dataType
    function match {
      case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
        copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
      case _ =>
        copy(function = f(function, (elementType, containsNull) :: Nil))
    }
  }

And it's instantiated like that:

        val param = NamedLambdaVariable("x", et, containsNull)
        val funcBody = insertMapSortRecursively(param)

        ArrayTransform(e, LambdaFunction(funcBody, Seq(param)))

So, regarding to resolving lambdas/partition lambdas from args, all args are owned by the implementation, and not by HigherOrderFunction, which is just a trait, therefore the lambas are indeed resolved, but so are the other args, and no partitioning occurs. I believe that our current approach would look like this in Spark:
trait Expression, implemented by class HigherOrderFunction which owns a trait HigherOrderFunctionImpl which is implemented by ArrayTransform, where the lambda and the arg would be stored in the class HigherOrderFunction, not in ArrayTransform. The Spark approach implemented in DF would be look the new LambdaFunction expression and LambdaUDF trait cited above, where implementation owns both the lambdas and the other args: Expr::LambdaFunction(LambdaFuntion::new(ArrayTransform::new(all_args_including_lambdas))), instead of Expr::LambdaFunction(LambdaFunction::new(ArrayTransform::new(lambdas), args_without_lambdas))

Finally, running this script on spark-shell returns the following output, showing both that lambda bodies are present in TreeNode traversals as well as LambdaFunction nodes:

val df = spark.sql("SELECT transform(array(1), v -> v*2)")
val transform = df.queryExecution.logical.expressions(0)

transform.foreach { node =>
  println(s"${node.nodeName}: ${node.toString}")
}

println(transform.treeString)
UnresolvedAlias: unresolvedalias(transform(array(1), lambdafunction((v * 2), v)))
UnresolvedFunction: transform(array(1), lambdafunction((v * 2), v))
UnresolvedFunction: array(1)
Literal: 1
LambdaFunction: lambdafunction((v * 2), v)
Multiply: (v * 2)
UnresolvedNamedLambdaVariable: v
Literal: 2
UnresolvedNamedLambdaVariable: v

unresolvedalias('transform('array(1), lambdafunction((lambda 'v * 2), lambda 'v, false)))
+- 'transform('array(1), lambdafunction((lambda 'v * 2), lambda 'v, false))
   :- 'array(1)
   :  +- 1
   +- lambdafunction((lambda 'v * 2), lambda 'v, false)
      :- (lambda 'v * 2)
      :  :- lambda 'v
      :  +- 2
      +- lambda 'v

In short, if we more or less agree with my points above, adding a new LambdaUDF just to not modify ScalarFunctionArgs, IMHO, doesn't seem worth the additional code that get's to be reviewed and maintained both in this PR (which is already big) and in subsequent PRs adding more lambdas UDFs like array_filter and array_fold

What do you think? I'm missing or misunderstanding something? Thanks!
And again, sorry for the delay

cc @keen85

@linhr
Copy link
Contributor

linhr commented Feb 19, 2026

Sorry for missing the update on this thread and thanks for the detailed analysis @gstvg!

The reasoning makes a lot of sense to me. And it's great to see your investigation on the Spark codebase.

I didn't think deeply enough about how the lambda function would need to interact with various parts in DataFusion, such as tree traversal, SQL unparser, and the general ability to preserve argument positions. So the LambdaUDF trait I mentioned indeed has many limitations.

Therefore, feel free to continue the ideas you are pursuing! And let's see if DataFusion maintainers who are familiar with this part of the codebase have anything to add for the approach.

@comphead
Copy link
Contributor

Thanks @gstvg for driving this. I missed this PR and actually was advocating for lambda support in DataFusion roadmap. Let me explore this approach, in high level IMO it makes sense to introduce LambdaUDF trait so taking as example SELECT list_filter([1,2,3,4], x -> x % 2 = 0); it could look like

FunctionExpression
  name: list_filter
  children:
    1. ListExpression [1,2,3,4]
    2. LambdaExpression
         parameters: ["x"]
         body:
            ComparisonExpression (=)
              left:
                 ArithmeticExpression (%)
                    left: ColumnRef("x")
                    right: Constant(2)
              right:
                 Constant(0)

Then we likely need to Bind x to know what is it and finally rewrite it into something computable, perhaps using UDF of (Int) => Boolean and provide a bitmap as the result of valid indices and then extract valid indices from array. It might preserve SIMD execution.

It is great to see this effort is moving

@gstvg
Copy link
Contributor Author

gstvg commented Feb 21, 2026

Thanks @linhr! It most applies to having lambdas and args partitioned, omitting the body on TreeNode and removing Expr::Lambda. Changing the PR to just use a new Expr::LambdaFunction(Arc<dyn LambdaUDF>) where args are unpartitioned, body is exposed on TreeNode and Expr::Lambda is kept is okay, it wouldn't incur in the other cited points, but would make the PR bigger.

Thanks @comphead! The current representation looks somewhat similar to your suggestion, except that is a Expr::ScalarFunction/ScalarFunctionExpr instead of dedicated LambdaFunction, and that references to lambda parameters uses LambdaVariable instead of Column, containing an optional value that should be binded before execution:

FunctionExpression //regular ScalarFunction
  name: list_filter
  children:
    1. ListExpression [1,2,3,4]
    2. LambdaExpression // the added physical LambdaExpr / logical Expr::Lambda
         parameters: ["x"]
         body:
            ComparisonExpression (=)
              left:
                 ArithmeticExpression (%)
                    left: LambdaVariable("x", Field::new("", Int32, false), None) that after binding becomes
                          LambdaVariable("x", Field::new("", Int32, false), Some([1, 2, 3, 4]))
                    right: Constant(2)
              right:
                 Constant(0)

The reason why adding LambdaVariable is easier than trying to use Column is show on the outdated sections on this PR description and on the first comment.

If the added code for a LambdaUDF is considered worthwhile to not modify ScalarUDF and/or because it's a better representation, while keeping others things unchanged, I'm okay with it. But since you mentioned vectorized execution, I just want to make sure that we are not moving to LambdaUDF only for performance because, contrary to Spark, where the LambdaVariable value is set and re-set for every row, which would obviously be terrible for a vectorized engine, here it is only set once per batch, and so can perform as fast as regular expressions. The LambdaVariable value being a ColumnarValue is just a niche optimization for scalar evaluation, we can change it to support ArrayRef only. Native performance has been a goal since the beginning, and a list_filter implementation based on this can be easily vectorized. As an example, #17220 got vectorized execution while also based on ScalarUDF and regular physical expressions, the lambda body is evaluated once per batch and it's output used to filter the list values with the filter kernel, and then the offsets are adjusted:

    let filter_array = lambda.evaluate(&batch)?;
    let ColumnarValue::Array(filter_array) = filter_array else {
        return exec_err!("array_filter requires a lambda that returns an array of booleans");
    };

    let filter_array = as_boolean_array(&filter_array)?;
    let filtered = filter(&values, filter_array)?;

    for row_index in 0..list_array.len() {
        if list_array.is_null(row_index) {
            // Handle null arrays by keeping the offset unchanged
            offsets.push_length(0);
            continue;
        }
        let start = value_offsets[row_index];
        let end = value_offsets[row_index + 1];
        let num_true = filter_array
            .slice(start.as_usize(), (end - start).as_usize())
            .true_count();
        offsets.push_length(num_true);
    }
    let offsets = offsets.finish();
    let list_array = GenericListArray::<OffsetSize>::try_new(
        Arc::clone(field),
        offsets,
        filtered,
        nulls.cloned(),
    )?;

    Ok(Arc::new(list_array))

One thing that can be optimized is use Buffer::count_set_bits_offset instead of BooleanArray::slice and then true_count to avoid 1-2 Arc::clone per loop iteration. There's also a faster way to adjust the offsets when the average sub-list length is small, something like 64 or less, but it's bigger and a bit more intricate, so I choose implement array_transform instead to make the PR smaller. A reasonable fast array_fold is even more complicated, but still implementable in any lambda approach discussed here.

array_transform here is also vectorized: the transforming lambda body is evaluated only once per batch/invoke/evaluate call.


Then we likely need to Bind x to know what is it and finally rewrite it into something computable, perhaps using UDF of (Int) => Boolean

I believe that by using LambdaVariable, planning the lambda body into a physical expr as usual is enough to compute it, without having to use UDFs. One thing that we must take into account is column capture, which is already fully supported here, including shadowing and projection pushdown(columns from the input plan that only appear within a lambda body). As an example, #17220 does not support that, and if tried to support it while using Expr::Column for lambda variables and passing those variables in the RecordBatch to PhysicalExpr::evaluate, would likely get the same problems I got in my first approach, which are also show in the outdated sections of the description and of the first comment here. That's why LambdaVariable exists. Supporting lambdas without capture support is relatively easy, regardless of the approach.

@rluvaton I see you also were not fond of adding physical expr's to ScalarFunctionArgs on #17220. Any comments here?


Finally, how would the new LambdaUDF trait works?
I believe it should look like ScalarFunction?

struct LambdaFunction {
    args: Vec<Expr>,
    fun: Arc<dyn LambdaUDF>,
}

enum Expr {
    ...
    Expr::LambdaFunction(LambdaFunction),
}

It also can be made more like Spark:

trait LambdaInvoker {
    ...
    fn invoke(&self, args: Vec<Expr>) -> Box<dyn LambdaInvocation>;
}

enum Expr {
    ...
    Expr::LambdaFunction(Box<dyn LambdaInvocation>),
}

I believe LambdaUDF/LambdaInvocation trait would look like ScalarUDFImpl except for the lambdas_parameters method, ReturnFieldArgs in return_field_from_args and LambdaFunctionArgs in the invoke method, or any of you imagine more fundamental changes?

Thanks!

@rluvaton
Copy link
Member

rluvaton commented Feb 23, 2026

thanks a lot for this PR.

Here are my thoughts:

Future features that the API should support without or minimal breaking changes

Couple of things to make sure we support or have a way to add them in the future without breaking changes:

  1. Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for lambda
  2. multiple lambdas in a single expression, for example map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda gets a different variables
  3. Lambda expression that access columns that are not in the list itself
    so I can do the following:
    | year |    grades      |
    |------|----------------|
    | 1998 | [1, 2, 3]      |
    | 1999 | [4, 99, 5, 10] |
    | 2000 | [6, 0, null]   |
    
    array_transform(grades, x -> if year <= 1990 then x * 10 else x)
  4. optional arguments for lambda, for example the index of the item in the list
    the optional here is important as I want to avoid creating that input if I don't need to.
  5. Nested lambda expressions: array_transform(matrix, x -> array_transform(x, y -> y * 2))

Things that every lambda implementation would need to handle

  1. replace null lists that is not empty underneath, with nulls with empty list underneath.
    consider the following when the expression can fail: array_transform(list, x -> 1 / x)
    with this input

    fn get_list() -> GenericListArray<i32> {
      GenericListArray::new(
        Arc::new(Field::new_list_field(DataType::Int8, false)),
        OffsetBuffer::<i32>::from_lengths(vec![2, 2, 1]),
        Arc::new(Int8Array::from(vec![1, 2, 0, 3, 4])),
        Some(NullBuffer::from(&[true, false, true])),
      )
    }

    which the second list is null but the underlying value is [0, 3] which if we run the transform on 0 on it, it will fail with division by zero.

    I have a lot of helpers to cleanup the nulls BTW

  2. sliced list and not computing values outside the sliced data

  3. fixed size list (which is different than list as you can't change the underlying nulls to be empty list) with a null list and the underlying values can cause failures, consider the expression array_transform(list, x -> 1 / x) on this input:

fn get_list() -> FixedSizeListArray<i32> {
    FixedSizeListArray::new(
       Arc::new(Field::new_list_field(DataType::Int8, false)),
       3,
       Arc::new(Int8Array::from(vec![
           1, 2, 3,
           0, 1, 1,
           4, 5, 6
       ])),
       Some(NullBuffer::from(&[true, false, true])),
    )
}

In every case here, we should answer:

  1. how would the user handle that?
  2. how could we make it easier for the user?
  3. How would we help them avoid forgetting handling?

I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that.

Why a new trait LambdaUDFImpl instead of adding functions on ScalarUDF

I think having a new LambdaUDFImpl is better than adding functions on existing ScalarUDF because:

  1. The ScalarUDF trait will not grow too much and make implementing regular scalar UDFs easier or lambda overwhelming
  2. what if we need to add a required function but only for lambda, we can add it on the new trait with ease and we won't need to do some weird stuff
    to avoid breaking changes.
  3. Less ambiguity on the API.

Implementation specific:

I want to keep the simplicity of ScalarUDF which means that in order to evaluate a lambda expression I don't need to construct stuff, only need to provide the input and maybe some options for future use.

@comphead
Copy link
Contributor

Thanks @rluvaton and @gstvg , its nice you mentioned array_transform, the tricky part for this function is its return type depends on lambda

array_transform(array<T>, function<T, U>) -> array<U>

I want to keep the simplicity of ScalarUDF which means that in order to evaluate a lambda expression I don't need to construct stuff, only need to provide the input and maybe some options for future use.

Right, on high level it could be like

pub struct LambdaExpr {
    /// Parameter names/types already resolved
    pub param_types: Vec<DataType>,

    /// Expression body, what needs to be evaluated, this thing potentially can be UDF
    pub body: Arc<dyn PhysicalExpr>,
}

Impl

impl LambdaExpr {
    pub fn new(
        param_types: Vec<DataType>,
        body: Arc<dyn PhysicalExpr>,
    ) -> Self {
        Self { param_types, body }
    }

    /// Evaluate lambda over provided arrays
    pub fn evaluate_with_args(
        &self,
        args: Vec<ArrayRef>,
    ) -> Result<ArrayRef> {
        // Build synthetic schema
        let fields: Vec<Field> = self.param_types
            .iter()
            .enumerate()
            .map(|(i, dt)| Field::new(format!("arg{}", i), dt.clone(), true))
            .collect();

        let schema = Arc::new(Schema::new(fields));

        let batch = RecordBatch::try_new(schema, args)?;

        self.body.evaluate(&batch)   // this where our UDF would be called
    }
}

So for example x -> x + 1 we need to parse expression and create our Lambda, so we need to modify parser to get structures below from user defined code and there is an existing ticket apache/datafusion-sqlparser-rs#1273

// Parameter x at column 0
let x = Arc::new(ColumnExpr::new(0));

// Literal 1
let one = Arc::new(LiteralExpr::new(
    ScalarValue::Int32(Some(1))
));

// x + 1
let body = Arc::new(BinaryExpr::new(
    x,
    one,
    Operator::Add,
));

// Lambda(x) -> x + 1
let lambda = LambdaExpr::new(
    vec![DataType::Int32],
    body,
);

and call it from caller built in function

fn array_transform(
    list_array: &ListArray,
    lambda: &LambdaExpr,
) -> Result<ListArray> {

    let values = list_array.values().clone();

    // evaluate lambda on flattened child array
    let transformed =
        lambda.evaluate_with_args(vec![values])?;

    Ok(ListArray::new(
        list_array.data_type().clone(),
        list_array.offsets().clone(),
        transformed,
        list_array.nulls().cloned(),
    ))
}

@github-actions github-actions bot added core Core DataFusion crate execution Related to the execution crate datasource Changes to the datasource crate and removed ffi Changes to the ffi crate labels Feb 23, 2026
@gstvg
Copy link
Contributor Author

gstvg commented Feb 24, 2026

@rluvaton @comphead

I updated the PR to a LambdaUDF trait based implementation. It added 1300 lines, totaling 3000, mostly boilerplate from ScalarUDF including a lot of documentation. Was that on the range you were expecting?
Based on the ScalarUDF docs stating that it exists to maintain backwards compatibility with an older API, I included only a LambdaUDF trait and not a struct LambdaUDF + trait LambdaUDFImpl pair to not make even bigger. There's a few small things missing that I want to implement tomorrow, and if all of you are okay with the results, I want to open this to review. My only concern is that the PR size may delay the review, as in #17220 (comment), but since IMHO this PR is simpler, I hope it will be that long to review, despite the size

  1. The ScalarUDF trait will not grow too much and make implementing regular scalar UDFs easier or lambda overwhelming
  2. what if we need to add a required function but only for lambda, we can add it on the new trait with ease and we won't need to do some weird stuff to avoid breaking changes.
  3. Less ambiguity on the API.

Yeah, I think that 2 is the main point. The previous ScalarUDF based approach only added a single method to the trait that already contains 20, it didn't require any change for non-lambda udfs (this would be unacceptable), and compared to the actual LambdaUDF based, lambda UDFs only required 2 additional lines of code per implementation, but, while I can't think of any ... all my previous counter-arguments can fall apart in the future with a single requirement change. There's upfront cost in review time and time to merge, but also more room to work in the future

Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for lambda

Currently any type is accepted and passed to to the implementation to derive the parameters from it, usually the inner values of a list, but I want to work with unions too, for example. But as of now array_transform doesn't handle ListView's (I thought there's no plans to support it overall?). Since the ListView values may contain unreferenced values, should it be compacted, or casted to a regular compacted List? And if so, return the transformed List or cast it to ListView?

multiple lambdas in a single expression, for example map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda gets a different variables

Supported: LambdaFunctionArgs.args can hold multiple lambdas

    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> {
        let [list_value, lambda] = take_function_args(self.name(), &args.args)?;

        let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) =
            (list_value, lambda)
        else {
            return exec_err!(...

Since the beginning I worked to support multiple lambdas to implement union manipulating functions like this:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', num -> num*2,
    'bool', bool -> NOT bool,
)

Lambda expression that access columns that are not in the list itself

Already supported and tested (I call it column capture through the PR and comments. Please ignore the comment above the test, I'm going to remove it)

CREATE TABLE t as SELECT 1 as n;
query ?
SELECT array_transform([1, 2], (e) -> n) from t;
----
[1, 1]

optional arguments for lambda, for example the index of the item in the list
the optional here is important as I want to avoid creating that input if I don't need to.

Also supported

        // use closures so that bind_lambda_variables evaluates only the params that are actually referenced
        // avoiding unnecessary computations
        let values_param = || Ok(Arc::clone(list_values));
        let indices_param = || elements_indices(&list_array);

        let binded_body = bind_lambda_variables(
            Arc::clone(&lambda.body),
            &lambda.params,
            &[&values_param, &indices_param],
        )?;

*after a refactor, bind_lambda_params actually eager evaluated all params, but I'll fix that

Nested lambda expressions

Supported

SELECT array_transform(t.v, (v1, i) -> array_transform(v1, (v2, j) -> array_transform(v2, v3 -> j)) ) from t;
----
[[[0, 1], [0]], [[0]], [[]]]

array_transform, the tricky part for this function is its return type depends on lambda

LambdaUDF contains a method lambdas_arguments where the implementation must return the type of the parameters of all it's lambdas when evaluated with a given set of values. Then DF uses this info to compute the type of the lambdas and pass it on return_field_from_args, so the implementation can easily compute it's return type. For example the expr array_transform([1, 2], v -> repeat(v, 'a')), lambdas_arguments would receive [ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda] and should return vec![None, Some(vec![DataType::Int32])]. Then return_field_from_args would be called with [ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda(Utf8)], where the implementation would need just to return List(Utf8).

return_field_from_args method
pub struct LambdaReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the field returned by the
    /// lambda when executed with the arguments returned from `LambdaUDF::lambdas_parameters`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[Field::new("", DataType::List(DataType::Int32), false), Field::new("", DataType::Boolean, false)]`
    pub arg_fields: &'a [ValueOrLambdaField],
    /// Is argument `i` to the function a scalar (constant)?
    ///
    /// If the argument `i` is not a scalar, it will be None
    ///
    /// For example, if a function is called like `my_function(column_a, 5)`
    /// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]`
    pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}

/// A tagged Field indicating whether it correspond to a value or a lambda argument
#[derive(Clone, Debug)]
pub enum ValueOrLambdaField {
    /// The Field of a ColumnarValue argument
    Value(FieldRef),
    /// The Field of the return of the lambda body when evaluated with the parameters from LambdaUDF::lambda_parameters
    Lambda(FieldRef),
}

    fn return_field_from_args(
        &self,
        args: datafusion_expr::LambdaReturnFieldArgs,
    ) -> Result<Arc<Field>> {
        let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] =
            take_function_args(self.name(), args.arg_fields)?
        else {
            return exec_err!(
                "{} expects a value follewed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        //TODO: should metadata be copied into the transformed array?

        // lambda is the resulting field of executing the lambda body
        // with the parameters returned in lambdas_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            _ => unreachable!(),
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
lambdas_parameters method
trait LambdaUDF {
    /// Returns the parameters that any lambda supports
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>>;
}

pub enum ValueOrLambdaParameter<'a> {
    /// A columnar value with the given field
    Value(FieldRef),
    /// A lambda
    Lambda,
}

// array_transform implementation

impl LambdaUDF for ArrayTransform {
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] =
            args
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        let (field, index_type) = match list.data_type() {
            DataType::List(field) => (field, DataType::Int32),
            DataType::LargeList(field) => (field, DataType::Int64),
            DataType::FixedSizeList(field, _) => (field, DataType::Int32),
            _ => return exec_err!("expected list, got {list}"),
        };

        // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
        // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
        // as datafusion will do that for us
        let value = Field::new("value", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());
        let index = Field::new("index", index_type, false);

        Ok(vec![None, Some(vec![value, index])])
    }
}

pub struct LambdaExpr {
/// Parameter names/types already resolved
pub param_types: Vec,
/// Expression body, what needs to be evaluated, this thing potentially can be UDF
pub body: Arc,
}

The current implementation is functional only with the parameters names without requiring their datatypes, and IHMO it's easier to the users to be like that, both expr_api and sql. The expr api could look like this:

    array_transform(
        col("my_array"),
        lambda(
            ["current_value"], // define the parameters name 
            lambda_variable("current_value") * lit(2) // the lambda body
        )
    )

How to parse the clickhouse syntax I believe it's a question for the future: either check that the types match with the ones returned by LambdaUDF::lambdas_parameters, or cast the parameters to the specified type

  1. replace null lists that is not empty underneath
  2. fixed size list with a null list
    I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that.

Saw your review in #17220 but thought it mattered only for performance and didn't thought about fallible expressions..
Yeah.. maybe we can add a helper udf that clean the list on user demand, to be called before calling the lambda udf?

For fixed size lists, I think we can replace null sublists with the first non-null sublist of the array, if the non-null also fails then it's a user problem

2 .sliced list and not computing values outside the sliced data

Also saw on #17220 about this but forgot it, thanks for reminding me, will fix soon

In every case here, we should answer:

how would the user handle that?
how could we make it easier for the user?
How would we help them avoid forgetting handling?

I think the best we can do is to document it, add examples and helpers functions and add a lot of comments in one or few LambdaUDFs as reference implementations.

@comphead
Copy link
Contributor

Thanks @gstvg I feel we need some structure here.
I pinged in ASF slack that we are discussing lambda support in this RFC. Also it would be great for ppl not to read through comments but having some simple doc on proposed object and how it would be parsed/evaluated, what traits involved.

I can try to start this doc by going through this RFC, or if you feel more comfortable you can create it, WDYT?

@timsaucer
Copy link
Member

I had a partial implementation for array_transform in #17289 but I wasn't able to get it over the line before I had to work on other things. There is a lot of discussion here so I'm going to try to make some time in the next few days to go over it all, but I am very interested in the directions this goes.

@gstvg
Copy link
Contributor Author

gstvg commented Feb 24, 2026

You are right @comphead, my first implementation was more complex and I thoroughly documented it, but after simplifying it to the current version, I thought it was easy to grasp and poorly documented it, but I was obviously wrong. Despite not liking much my high level writing skills, I believe that writing the doc is my responsibility (there's few edge cases and alternative approaches not discussed yet). But I would really appreciate if you reviewed it, I can push a DOC.md to this branch and we can start a review on it to not add even more comments here, and then finally update the PR description, WDYT?

@comphead
Copy link
Contributor

But I would really appreciate if you reviewed it, I can push a DOC.md to this branch and we can start a review on it to not add even more comments here, and then finally update the PR description, WDYT?

it sounds great IMO, there are a lot of people expressing the interest to the topic and love to review as well

@comphead
Copy link
Contributor

@andygrove made another attempt of taming lambdas for Comet in apache/datafusion-comet#3611

@gstvg
Copy link
Contributor Author

gstvg commented Mar 1, 2026

@comphead Very cool, the LambdaVariable I'm using here was inspired by Spark btw, both ended up looking similar.
I updated the PR description and also pushed DOC.md in case you have any comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@LiaCastaneda LiaCastaneda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 We're very interested in this feature and would love to see it land -- thanks for the effort on this epic PR and the detailed DOC.md! Let us know if there's anything we can do to help move it forward 🙇‍♀️

Comment on lines +155 to +157
Expr::LambdaFunction(expr) => producer.handle_lambda_function(expr, schema),
Expr::Lambda(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), // not yet implemented in substrait-rs
Expr::LambdaVariable(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"), // not yet implemented in substrait-rs
Copy link
Contributor

@LiaCastaneda LiaCastaneda Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benbellick added some in the substrait spec in substrait-io/substrait#889, but might not be released yet. I wonder if the expressions can actually be mapped 1:1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this comment more fully later. The protos for lambdas do exist in substrait-rs but haven't been released yet. I am looking into how to do that release right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done the release. v0.62.3 contains the protos for lambdas.

Considering the size of this PR, I think it is appropriate to leave the handling of substrait lambdas in datafusion for a subsequent PR. But let's definitely make sure that we do this PR with the substrait interaction in mind :)

I will review the relevant parts for that reason. Thanks!

Copy link
Contributor

@LiaCastaneda LiaCastaneda Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, I agree the substrait consuming /producing can be left as a follow up, should we bump substrait in datafusion in the meantime?

Copy link
Contributor Author

@gstvg gstvg Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LiaCastaneda for the info and @benbellick for the release

I wonder if the expressions can actually be mapped 1:1

I did a quick check and I believe it's possible, yes.

LambdaFunction produces a regular substrait ScalarFunction, and to consume we just check whether any LambdaUDF exists with the given function name and/or if any arg is a lambda.

To produce a substrait Lambda with the field parameters struct where each field corresponds to a parameter, we use the return of LambdaUDF::lambdas_parameters, and for consuming, we can use default parameters names like p1, p2 etc

To produce a FieldReference/LambdaParameterReference with a correct steps_out and direct_reference.struct_field.field, we keep a HashMap<String, (usize, usize)> of lambda_parameter_name => (step_outs, struct_field_index), that get's updated for every Lambda to be produced: step_outs is reset to 0 for shadowed parameters, others incremented by one, and struct_field_index set according to it's position within the parameters of the lambda it originates from. Consuming into LambdaVariable with it's field resolved is similar to SQL planning: keep a HashMap<usize, FieldRef> of direct_reference.struct_field.field => field that get's updated for every LambdaUDF by the return of it's lambdas_parameters method

I'll address your another comments and then implement substrait support on another branch to make sure is viable


`LambdaFunction` `Signature` is non functional

Currenty, `LambdaUDF::signature` returns the same `Signature` as `ScalarUDF`, but it's `type_signature` field is never used, as most variants of the `TypeSignature` enum aren't applicable to a lambda, and no type coercion is applied on it's arguments, being currently a implementation responsability. We should either add lambda compatible variants to the `TypeSignature` enum, create a new `LambdaTypeSignature` and `LambdaSignature`, or support no automatic type coercion at all on lambda functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine type coercion for lambda functions would only apply to value arguments right?
For example, arr in array_transform(arr, v -> v) if arr is List<Int32> but the function only handles List<Int64>, people might forget to coerce this themselves.

What if we changed the coerce_types signature of LambdaUDF to:

fn coerce_types(&self, args: &[ValueOrLambdaParameter]) -> Result<Vec<Option<DataType>>>
So it coerces value args (returning Some(DataType)) but skips lambda args (returning None)? I think coerce_value_types would be a better name for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add an unimplemented note in the substrait_consumer?

impl ArrayTransform {
pub fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be clearer if we had a especial signature for lambda functions that has both volatility and parameter names?

pub struct LambdaSignature {
    pub volatility: Volatility,
    pub parameter_names: Option<Vec<String>>,
}

since this is a different trait to ScalarUDF, and types can't really de defined in an static way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate execution Related to the execution crate functions Changes to functions implementation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate spark sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for lambda/higher order functions

10 participants