Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
16 changes: 9 additions & 7 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,17 @@ def expand(self, pcoll):
lambda _: 1, sum, 'count')


class _Fakes:
fn = str
class SomeTransform(beam.PTransform):
def __init__(self, *args, **kwargs):
super().__init__()

class SomeTransform(beam.PTransform):
def __init__(*args, **kwargs):
pass
def expand(self, pcoll):
return pcoll

def expand(self, pcoll):
return pcoll

class _Fakes:
fn = str
SomeTransform = SomeTransform


RENDER_DIR = None
Expand Down
136 changes: 58 additions & 78 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
#

"""This module defines the basic MapToFields operation."""
import datetime
import itertools
import json
import re
import threading
Comment thread
derrickaw marked this conversation as resolved.
from collections import abc
from collections.abc import Callable
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import Mapping
from decimal import Decimal
from typing import Any
from typing import NamedTuple
from typing import Optional
Expand Down Expand Up @@ -53,13 +57,11 @@
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows

# Import js2py package if it exists
# Import quickjs package if it exists
try:
import js2py
from js2py.base import JsObjectWrapper
import quickjs
except ImportError:
js2py = None
JsObjectWrapper = object
quickjs = None

_str_expression_fields = {
'AssignTimestamps': 'timestamp',
Expand Down Expand Up @@ -178,18 +180,29 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')


# js2py's JsObjectWrapper object has a self-referencing __dict__ property
# that cannot be pickled without implementing the __getstate__ and
# __setstate__ methods.
class _CustomJsObjectWrapper(JsObjectWrapper):
def __init__(self, js_obj):
super().__init__(js_obj.__dict__['_obj'])
_THREAD_LOCAL_JS_CACHE = threading.local()

def __getstate__(self):
return self.__dict__.copy()

def __setstate__(self, state):
self.__dict__.update(state)
class _JsFunctionWrapper:
def __init__(self, source_code, entrypoint_name):
self.source_code = source_code
self.entrypoint_name = entrypoint_name

def _get_fn(self):
cache = _THREAD_LOCAL_JS_CACHE
if not hasattr(cache, 'functions'):
cache.functions = {}

cache_key = (self.source_code, self.entrypoint_name)
if cache_key not in cache.functions:
cache.functions[cache_key] = quickjs.Function(
self.entrypoint_name, self.source_code)

return cache.functions[cache_key]

def __call__(self, row):
fn = self._get_fn()
return dicts_to_rows(fn(py_value_to_js_dict(row)))


# TODO(yaml) Improve type inferencing for JS UDF's
Expand All @@ -199,6 +212,12 @@ def py_value_to_js_dict(py_value):
py_value = py_value._asdict()
if isinstance(py_value, dict):
return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
elif isinstance(py_value, bytes):
return py_value.decode('utf-8', errors='replace')
elif isinstance(py_value, (datetime.datetime, datetime.date, datetime.time)):
return py_value.isoformat()
elif isinstance(py_value, Decimal):
return float(py_value)
elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable):
return [py_value_to_js_dict(value) for value in list(py_value)]
else:
Expand All @@ -210,80 +229,41 @@ def py_value_to_js_dict(py_value):
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):

# Check for installed js2py package
if js2py is None:
if quickjs is None:
raise ValueError(
"Javascript mapping functions are not supported on"
" Python 3.12 or later.")

# import remaining js2py objects
from js2py import base
from js2py.constructors import jsdate
from js2py.internals import simplex

js_array_type = (
base.PyJsArray,
base.PyJsArrayBuffer,
base.PyJsInt8Array,
base.PyJsUint8Array,
base.PyJsUint8ClampedArray,
base.PyJsInt16Array,
base.PyJsUint16Array,
base.PyJsInt32Array,
base.PyJsUint32Array,
base.PyJsFloat32Array,
base.PyJsFloat64Array)

def _js_object_to_py_object(obj):
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
return base.to_python(obj)
elif isinstance(obj, js_array_type):
return [_js_object_to_py_object(value) for value in obj.to_list()]
elif isinstance(obj, jsdate.PyJsDate):
return obj.to_utc_dt()
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
return None
elif isinstance(obj, base.PyJsError):
raise RuntimeError(obj['message'])
elif isinstance(obj, base.PyJsObject):
return {
key: _js_object_to_py_object(value['value'])
for (key, value) in obj.own.items()
}
elif isinstance(obj, base.JsObjectWrapper):
return _js_object_to_py_object(obj._obj)

return obj
"Javascript mapping functions are not supported because the "
"quickjs-ng library is not installed.")

if expression:
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
unpacking_code = '\n'.join([
f" var {name} = __row__['{name}'];" for name in original_fields
if name in expression and name.isidentifier()
])
Comment thread
derrickaw marked this conversation as resolved.
source_code = f"""
function udf(__row__) {{
{unpacking_code}
return ({expression});
}}
"""
entrypoint = 'udf'

elif callable:
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
match = re.search(r'(?:async\s+)?function\s+([a-zA-Z0-9_]+)', callable)
Comment thread
derrickaw marked this conversation as resolved.
if not match:
raise ValueError(
f"Could not find function declaration in callable: {callable}")
udf_name = match.group(1)
source_code = callable
entrypoint = udf_name

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_func = _CustomJsObjectWrapper(getattr(js, name))
source_code = udf_code
entrypoint = name

def js_wrapper(row):
row_as_dict = py_value_to_js_dict(row)
try:
js_result = js_func(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(
f"Error evaluating javascript expression: "
f"{exn.mes['message']}") from exn
return dicts_to_rows(_js_object_to_py_object(js_result))

return js_wrapper
return _JsFunctionWrapper(source_code, entrypoint)


def _expand_python_mapping_func(
Expand Down
50 changes: 43 additions & 7 deletions sdks/python/apache_beam/yaml/yaml_udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import logging
import os
import shutil
import tempfile
import unittest
from decimal import Decimal

import apache_beam as beam
from apache_beam.io import localfilesystem
Expand All @@ -32,10 +34,10 @@
from apache_beam.yaml.yaml_transform import YamlTransform

try:
import js2py
import quickjs
except ImportError:
js2py = None
logging.warning('js2py is not installed; some tests will be skipped.')
quickjs = None
logging.warning('quickjs-ng is not installed; some tests will be skipped.')


def as_rows():
Expand Down Expand Up @@ -63,7 +65,7 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.tmpdir)

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_map_to_fields_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -197,7 +199,7 @@ def test_map_to_fields_sql_reserved_keyword_append():
beam.Row(label='389a', timestamp=2, label_copy="389a"),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -252,7 +254,7 @@ def test_filter_inline_py(self):
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_expression_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
Expand Down Expand Up @@ -296,7 +298,7 @@ def test_filter_expression_py(self):
row=beam.Row(rank=0, values=[1, 2, 3])),
]))

@unittest.skipIf(js2py is None, 'js2py not installed.')
@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_filter_inline_js_file(self):
data = '''
function f(x) {
Expand Down Expand Up @@ -374,6 +376,40 @@ def g(x):
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

@unittest.skipIf(quickjs is None, 'quickjs-ng not installed.')
def test_map_to_fields_js_non_serializable_types(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
])) as p:
data = [
beam.Row(
b=b'hello',
dt=datetime.datetime(2026, 5, 14, 12, 0, 0),
dec=Decimal('10.5'))
]
elements = p | beam.Create(data)
result = elements | YamlTransform(
'''
type: MapToFields
config:
language: javascript
fields:
b_out:
expression: "b + '_world'"
dt_out:
expression: "dt"
dec_out:
expression: "dec * 2"
''')
assert_that(
result,
equal_to([
beam.Row(
b_out='hello_world',
dt_out='2026-05-14T12:00:00',
dec_out=21.0),
]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,8 +638,7 @@ def get_portability_package_data():
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
'virtualenv-clone>=0.5,<1.0',
# https://github.com/PiotrDabkowski/Js2Py/issues/317
'js2py>=0.74,<1; python_version<"3.12"',
'quickjs-ng>=0.14.0,<1.0.0',
'jsonschema>=4.0.0,<5.0.0',
] + dataframe_dependency,
# Keep the following dependencies in line with what we test against
Expand Down
Loading