diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } diff --git a/sdks/python/apache_beam/yaml/readme_test.py b/sdks/python/apache_beam/yaml/readme_test.py index ad25111bd229..7596da15171c 100644 --- a/sdks/python/apache_beam/yaml/readme_test.py +++ b/sdks/python/apache_beam/yaml/readme_test.py @@ -132,15 +132,17 @@ def expand(self, pcoll): lambda _: 1, sum, 'count') -class _Fakes: - fn = str +class SomeTransform(beam.PTransform): + def __init__(self, *args, **kwargs): + super().__init__() - class SomeTransform(beam.PTransform): - def __init__(*args, **kwargs): - pass + def expand(self, pcoll): + return pcoll - def expand(self, pcoll): - return pcoll + +class _Fakes: + fn = str + SomeTransform = SomeTransform RENDER_DIR = None diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..cec878fb065b 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -16,13 +16,17 @@ # """This module defines the basic MapToFields operation.""" +import datetime import itertools +import json import re +import threading from collections import abc from collections.abc import Callable from collections.abc import Collection from collections.abc import Iterable from collections.abc import Mapping +from decimal import Decimal from typing import Any from typing import NamedTuple from typing import Optional @@ -53,13 +57,11 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists +# Import quickjs package if it exists try: - import js2py - from js2py.base import JsObjectWrapper + import quickjs except ImportError: - js2py = None - JsObjectWrapper = object + quickjs = None _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -178,18 +180,29 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) +_THREAD_LOCAL_JS_CACHE = threading.local() - def __getstate__(self): - return self.__dict__.copy() - def __setstate__(self, state): - self.__dict__.update(state) +class _JsFunctionWrapper: + def __init__(self, source_code, entrypoint_name): + self.source_code = source_code + self.entrypoint_name = entrypoint_name + + def _get_fn(self): + cache = _THREAD_LOCAL_JS_CACHE + if not hasattr(cache, 'functions'): + cache.functions = {} + + cache_key = (self.source_code, self.entrypoint_name) + if cache_key not in cache.functions: + cache.functions[cache_key] = quickjs.Function( + self.entrypoint_name, self.source_code) + + return cache.functions[cache_key] + + def __call__(self, row): + fn = self._get_fn() + return dicts_to_rows(fn(py_value_to_js_dict(row))) # TODO(yaml) Improve type inferencing for JS UDF's @@ -199,6 +212,12 @@ def py_value_to_js_dict(py_value): py_value = py_value._asdict() if isinstance(py_value, dict): return {key: py_value_to_js_dict(value) for key, value in py_value.items()} + elif isinstance(py_value, bytes): + return py_value.decode('utf-8', errors='replace') + elif isinstance(py_value, (datetime.datetime, datetime.date, datetime.time)): + return py_value.isoformat() + elif isinstance(py_value, Decimal): + return float(py_value) elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable): return [py_value_to_js_dict(value) for value in list(py_value)] else: @@ -210,80 +229,41 @@ def py_value_to_js_dict(py_value): def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): - # Check for installed js2py package - if js2py is None: + if quickjs is None: raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) - - return obj + "Javascript mapping functions are not supported because the " + "quickjs-ng library is not installed.") if expression: - source = '\n'.join(['function(__row__) {'] + [ - f' {name} = __row__.{name}' - for name in original_fields if name in expression - ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) + unpacking_code = '\n'.join([ + f" var {name} = __row__['{name}'];" for name in original_fields + if name in expression and name.isidentifier() + ]) + source_code = f""" + function udf(__row__) {{ + {unpacking_code} + return ({expression}); + }} + """ + entrypoint = 'udf' elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) + match = re.search(r'(?:async\s+)?function\s+([a-zA-Z0-9_]+)', callable) + if not match: + raise ValueError( + f"Could not find function declaration in callable: {callable}") + udf_name = match.group(1) + source_code = callable + entrypoint = udf_name else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) + source_code = udf_code + entrypoint = name - def js_wrapper(row): - row_as_dict = py_value_to_js_dict(row) - try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: - raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) - - return js_wrapper + return _JsFunctionWrapper(source_code, entrypoint) def _expand_python_mapping_func( diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..1fcb55b5ab63 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -14,11 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime import logging import os import shutil import tempfile import unittest +from decimal import Decimal import apache_beam as beam from apache_beam.io import localfilesystem @@ -32,10 +34,10 @@ from apache_beam.yaml.yaml_transform import YamlTransform try: - import js2py + import quickjs except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + quickjs = None + logging.warning('quickjs-ng is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +65,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs-ng not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +199,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs-ng not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +254,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs-ng not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +298,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(quickjs is None, 'quickjs-ng not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { @@ -374,6 +376,40 @@ def g(x): row=beam.Row(rank=2, values=[7, 8, 9])), ])) + @unittest.skipIf(quickjs is None, 'quickjs-ng not installed.') + def test_map_to_fields_js_non_serializable_types(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: + data = [ + beam.Row( + b=b'hello', + dt=datetime.datetime(2026, 5, 14, 12, 0, 0), + dec=Decimal('10.5')) + ] + elements = p | beam.Create(data) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + b_out: + expression: "b + '_world'" + dt_out: + expression: "dt" + dec_out: + expression: "dec * 2" + ''') + assert_that( + result, + equal_to([ + beam.Row( + b_out='hello_world', + dt_out='2026-05-14T12:00:00', + dec_out=21.0), + ])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 4c1384c31517..535524b46a1f 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -638,8 +638,7 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + 'quickjs-ng>=0.14.0,<1.0.0', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against