diff --git a/.github/workflows/base.yml b/.github/workflows/base.yml index d5e4ec7..9c70ca3 100644 --- a/.github/workflows/base.yml +++ b/.github/workflows/base.yml @@ -12,28 +12,45 @@ jobs: strategy: fail-fast: false matrix: - PYSPARK_VERSION: ["3.1.3", "3.2", "3.3", "3.5"] + include: + - PYSPARK_VERSION: "3.1.3" + JAVA_VERSION: "11" + PYTHON_VERSION: "3.8" + - PYSPARK_VERSION: "3.2" + JAVA_VERSION: "11" + PYTHON_VERSION: "3.8" + - PYSPARK_VERSION: "3.3" + JAVA_VERSION: "11" + PYTHON_VERSION: "3.8" + - PYSPARK_VERSION: "3.5" + JAVA_VERSION: "11" + PYTHON_VERSION: "3.8" + - PYSPARK_VERSION: "4.0.0" + JAVA_VERSION: "17" + PYTHON_VERSION: "3.9" + PANDAS_VERSION: ">=2.0.0" steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v2 - name: Install Python 3.8 + name: Install Python ${{matrix.PYTHON_VERSION}} with: - python-version: 3.8 + python-version: ${{matrix.PYTHON_VERSION}} - uses: actions/setup-java@v1 - name: Setup Java 11 - if: startsWith(matrix.PYSPARK_VERSION, '3') + name: Setup Java ${{matrix.JAVA_VERSION}} with: - java-version: "11" + java-version: ${{matrix.JAVA_VERSION}} - name: Running tests with pyspark==${{matrix.PYSPARK_VERSION}} env: SPARK_VERSION: ${{matrix.PYSPARK_VERSION}} + PANDAS_VERSION: ${{matrix.PANDAS_VERSION}} run: | pip install --upgrade pip pip install poetry==1.7.1 poetry install - poetry add pyspark==$SPARK_VERSION + poetry run pip install pyspark==$SPARK_VERSION + if [ -n "$PANDAS_VERSION" ]; then poetry run pip install "pandas$PANDAS_VERSION"; fi poetry run python -m pytest -s tests diff --git a/pydeequ/analyzers.py b/pydeequ/analyzers.py index 3952c93..3750f90 100644 --- a/pydeequ/analyzers.py +++ b/pydeequ/analyzers.py @@ -9,7 +9,7 @@ from pydeequ.pandas_utils import ensure_pyspark_df from pydeequ.repository import MetricsRepository, ResultKey from enum import Enum -from pydeequ.scala_utils import to_scala_seq +from pydeequ.scala_utils import empty_scala_seq, to_scala_seq from pydeequ.configs import SPARK_VERSION class _AnalyzerObject: @@ -311,7 +311,7 @@ def _analyzer_jvm(self): self.instance, self.predicate, self._jvm.scala.Option.apply(self.where), - self._jvm.scala.collection.Seq.empty(), + empty_scala_seq(self._jvm), self._jvm.scala.Option.apply(None) ) diff --git a/pydeequ/checks.py b/pydeequ/checks.py index 749f74d..012b880 100644 --- a/pydeequ/checks.py +++ b/pydeequ/checks.py @@ -5,7 +5,7 @@ from pyspark.sql import SparkSession from pydeequ.check_functions import is_one -from pydeequ.scala_utils import ScalaFunction1, to_scala_seq +from pydeequ.scala_utils import ScalaFunction1, empty_scala_seq, to_scala_seq from pydeequ.configs import SPARK_VERSION # TODO implement custom assertions @@ -563,7 +563,7 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None): constraintName, assertion_func, hint, - self._jvm.scala.collection.Seq.empty(), + empty_scala_seq(self._jvm), self._jvm.scala.Option.apply(None) ) return self diff --git a/pydeequ/configs.py b/pydeequ/configs.py index e56c97d..60c3403 100644 --- a/pydeequ/configs.py +++ b/pydeequ/configs.py @@ -5,6 +5,7 @@ SPARK_TO_DEEQU_COORD_MAPPING = { + "4.0": "com.amazon.deequ:deequ:2.0.14-spark-4.0", "3.5": "com.amazon.deequ:deequ:2.0.8-spark-3.5", "3.3": "com.amazon.deequ:deequ:2.0.8-spark-3.3", "3.2": "com.amazon.deequ:deequ:2.0.8-spark-3.2", diff --git a/pydeequ/profiles.py b/pydeequ/profiles.py index fbbfd84..c82dca2 100644 --- a/pydeequ/profiles.py +++ b/pydeequ/profiles.py @@ -254,7 +254,7 @@ def _columnProfilesFromColumnRunBuilderRun(self, run): :return: a setter for columnProfilerRunner result """ self._run_result = run - profile_map = self._jvm.scala.collection.JavaConversions.mapAsJavaMap(run.profiles()) # TODO from ScalaUtils + profile_map = self._jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(run.profiles()).asJava() # TODO from ScalaUtils self._profiles = {column: self._columnProfileBuilder(column, profile_map[column]) for column in profile_map} return self diff --git a/pydeequ/scala_utils.py b/pydeequ/scala_utils.py index b6d3e83..20506c3 100644 --- a/pydeequ/scala_utils.py +++ b/pydeequ/scala_utils.py @@ -77,7 +77,19 @@ def to_scala_seq(jvm, iterable): Returns: Scala sequence """ - return jvm.scala.collection.JavaConversions.iterableAsScalaIterable(iterable).toSeq() + return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter(iterable).asScala().toSeq() + + +def empty_scala_seq(jvm): + """ + Returns an empty Scala immutable List (Nil), usable as Seq[_]. + Uses JavaConverters.toList() to produce an immutable.List rather than + a Stream, which is required for Py4J constructor/method lookup to succeed + across both Scala 2.12 (Spark 3.x) and Scala 2.13 (Spark 4+). + """ + return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter( + jvm.java.util.ArrayList() + ).asScala().toList() def to_scala_map(spark_session, d): @@ -93,11 +105,11 @@ def to_scala_map(spark_session, d): def scala_map_to_dict(jvm, scala_map): - return dict(jvm.scala.collection.JavaConversions.mapAsJavaMap(scala_map)) + return dict(jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(scala_map).asJava()) def scala_map_to_java_map(jvm, scala_map): - return jvm.scala.collection.JavaConversions.mapAsJavaMap(scala_map) + return jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(scala_map).asJava() def java_list_to_python_list(java_list: str, datatype): diff --git a/pyproject.toml b/pyproject.toml index dcb6a11..eefe17c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,10 @@ classifiers = [ python = ">=3.8,<4" numpy = ">=1.14.1" pandas = ">=0.23.0" -pyspark = { version = ">=2.4.7,<3.4.0", optional = true } +pyspark = [ + { version = ">=2.4.7,<4.0.0", optional = true, python = ">=3.8,<3.9" }, + { version = ">=2.4.7,<5.0.0", optional = true, python = ">=3.9" }, +] [tool.poetry.dev-dependencies] pytest = "^6.2.4"