diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml index 0154c637f63a..fde69953ee75 100755 --- a/.github/workflows/paimon-python-checks.yml +++ b/.github/workflows/paimon-python-checks.yml @@ -71,6 +71,8 @@ jobs: build-essential \ git \ curl \ + pkg-config \ + libssl-dev \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -139,12 +141,22 @@ jobs: if: matrix.python-version != '3.6.15' shell: bash run: | - pip install maturin + pip install maturin[patchelf] git clone -b support_directory https://github.com/JingsongLi/tantivy-py.git /tmp/tantivy-py cd /tmp/tantivy-py maturin build --release pip install target/wheels/tantivy-*.whl + - name: Build and install pypaimon-rust from source + if: matrix.python-version != '3.6.15' + shell: bash + run: | + git clone https://github.com/apache/paimon-rust.git /tmp/paimon-rust + cd /tmp/paimon-rust/bindings/python + maturin build --release -o dist + pip install dist/pypaimon_rust-*.whl + pip install 'datafusion>=52' + - name: Run lint-python.sh shell: bash run: | diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md index a328fc4744ce..6485ca58768e 100644 --- a/docs/content/pypaimon/cli.md +++ b/docs/content/pypaimon/cli.md @@ -621,3 +621,105 @@ default mydb analytics ``` + +## SQL Command + +Execute SQL queries on Paimon tables directly from the command line. This feature is powered by pypaimon-rust and DataFusion. + +**Prerequisites:** + +```shell +pip install pypaimon[sql] +``` + +### One-Shot Query + +Execute a single SQL query and display the result: + +```shell +paimon sql "SELECT * FROM users LIMIT 10" +``` + +Output: +``` + id name age city + 1 Alice 25 Beijing + 2 Bob 30 Shanghai + 3 Charlie 35 Guangzhou +``` + +**Options:** + +- `--format, -f`: Output format: `table` (default) or `json` + +**Examples:** + +```shell +# Direct table name (uses default catalog and database) +paimon sql "SELECT * FROM users" + +# Two-part: database.table +paimon sql "SELECT * FROM mydb.users" + +# Query with filter and aggregation +paimon sql "SELECT city, COUNT(*) AS cnt FROM users GROUP BY city ORDER BY cnt DESC" + +# Output as JSON +paimon sql "SELECT * FROM users LIMIT 5" --format json +``` + +### Interactive REPL + +Start an interactive SQL session by running `paimon sql` without a query argument. The REPL supports arrow keys for line editing, and command history is persisted across sessions in `~/.paimon_history`. + +```shell +paimon sql +``` + +Output: +``` + ____ _ + / __ \____ _(_)___ ___ ____ ____ + / /_/ / __ `/ / __ `__ \/ __ \/ __ \ + / ____/ /_/ / / / / / / / /_/ / / / / +/_/ \__,_/_/_/ /_/ /_/\____/_/ /_/ + + Powered by pypaimon-rust + DataFusion + Type 'help' for usage, 'exit' to quit. + +paimon> SHOW DATABASES; +default +mydb + +paimon> USE mydb; +Using database 'mydb'. + +paimon> SHOW TABLES; +orders +users + +paimon> SELECT count(*) AS cnt + > FROM users + > WHERE age > 18; + cnt + 42 +(1 row in 0.05s) + +paimon> exit +Bye! +``` + +SQL statements end with `;` and can span multiple lines. The continuation prompt ` >` indicates that more input is expected. + +**REPL Commands:** + +| Command | Description | +|---|---| +| `USE ;` | Switch the default database | +| `SHOW DATABASES;` | List all databases | +| `SHOW TABLES;` | List tables in the current database | +| `SELECT ...;` | Execute a SQL query | +| `help` | Show usage information | +| `exit` / `quit` | Exit the REPL | + +For more details on SQL syntax and the Python API, see [SQL Query]({{< ref "pypaimon/sql" >}}). diff --git a/docs/content/pypaimon/sql.md b/docs/content/pypaimon/sql.md new file mode 100644 index 000000000000..88d08a01d07b --- /dev/null +++ b/docs/content/pypaimon/sql.md @@ -0,0 +1,168 @@ +--- +title: "SQL Query" +weight: 8 +type: docs +aliases: + - /pypaimon/sql.html +--- + + + +# SQL Query + +PyPaimon supports executing SQL queries on Paimon tables, powered by [pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python) and [DataFusion](https://datafusion.apache.org/python/). + +## Installation + +SQL query support requires additional dependencies. Install them with: + +```shell +pip install pypaimon[sql] +``` + +This will install `pypaimon-rust` and `datafusion`. + +## Usage + +Create a `SQLContext`, register one or more catalogs with their options, and run SQL queries. + +### Basic Query + +```python +from pypaimon.sql import SQLContext + +ctx = SQLContext() +ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"}) +ctx.set_current_catalog("paimon") +ctx.set_current_database("default") + +# Execute SQL and get PyArrow Table +table = ctx.sql("SELECT * FROM my_table") +print(table) + +# Convert to Pandas DataFrame +df = table.to_pandas() +print(df) +``` + +### Table Reference Format + +The default catalog and default database can be configured via `set_current_catalog()` and `set_current_database()`, so you can reference tables in two ways: + +```python +# Direct table name (uses default database) +ctx.sql("SELECT * FROM my_table") + +# Two-part: database.table +ctx.sql("SELECT * FROM mydb.my_table") +``` + +### Filtering + +```python +table = ctx.sql(""" + SELECT id, name, age + FROM users + WHERE age > 18 AND city = 'Beijing' +""") +``` + +### Aggregation + +```python +table = ctx.sql(""" + SELECT city, COUNT(*) AS cnt, AVG(age) AS avg_age + FROM users + GROUP BY city + ORDER BY cnt DESC +""") +``` + +### Join + +```python +table = ctx.sql(""" + SELECT u.name, o.order_id, o.amount + FROM users u + JOIN orders o ON u.id = o.user_id + WHERE o.amount > 100 +""") +``` + +### Subquery + +```python +table = ctx.sql(""" + SELECT * FROM users + WHERE id IN ( + SELECT user_id FROM orders + WHERE amount > 1000 + ) +""") +``` + +### Cross-Database Query + +```python +# Query a table in another database using two-part syntax +table = ctx.sql(""" + SELECT u.name, o.amount + FROM default.users u + JOIN analytics.orders o ON u.id = o.user_id +""") +``` + +### Multi-Catalog Query + +`SQLContext` supports registering multiple catalogs for cross-catalog queries: + +```python +from pypaimon.sql import SQLContext + +ctx = SQLContext() +ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"}) +ctx.register_catalog("b", { + "metastore": "rest", + "uri": "http://localhost:8080", + "warehouse": "warehouse_b", +}) +ctx.set_current_catalog("a") +ctx.set_current_database("default") + +# Cross-catalog join +table = ctx.sql(""" + SELECT a_users.name, b_orders.amount + FROM a.default.users AS a_users + JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id +""") +``` + +## Supported SQL Syntax + +The SQL engine is powered by Apache DataFusion, which supports a rich set of SQL syntax including: + +- `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `ORDER BY`, `LIMIT` +- `JOIN` (INNER, LEFT, RIGHT, FULL, CROSS) +- Subqueries and CTEs (`WITH`) +- Aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, etc.) +- Window functions (`ROW_NUMBER`, `RANK`, `LAG`, `LEAD`, etc.) +- `UNION`, `INTERSECT`, `EXCEPT` + +For the full SQL reference, see the [DataFusion SQL documentation](https://datafusion.apache.org/user-guide/sql/index.html). diff --git a/paimon-python/pypaimon/__init__.py b/paimon-python/pypaimon/__init__.py index 77965c3a14a2..e07179fb28d6 100644 --- a/paimon-python/pypaimon/__init__.py +++ b/paimon-python/pypaimon/__init__.py @@ -28,6 +28,7 @@ from pypaimon.schema.schema import Schema from pypaimon.tag.tag import Tag from pypaimon.tag.tag_manager import TagManager +from pypaimon.sql.sql_context import SQLContext __all__ = [ "PaimonVirtualFileSystem", @@ -35,4 +36,5 @@ "Schema", "Tag", "TagManager", + "SQLContext", ] diff --git a/paimon-python/pypaimon/cli/cli.py b/paimon-python/pypaimon/cli/cli.py index 2ffdbd206dc6..37a0d3cbb853 100644 --- a/paimon-python/pypaimon/cli/cli.py +++ b/paimon-python/pypaimon/cli/cli.py @@ -121,6 +121,10 @@ def main(): from pypaimon.cli.cli_catalog import add_catalog_subcommands add_catalog_subcommands(catalog_parser) + # SQL command + from pypaimon.cli.cli_sql import add_sql_subcommand + add_sql_subcommand(subparsers) + args = parser.parse_args() if args.command is None: diff --git a/paimon-python/pypaimon/cli/cli_sql.py b/paimon-python/pypaimon/cli/cli_sql.py new file mode 100644 index 000000000000..170cda871037 --- /dev/null +++ b/paimon-python/pypaimon/cli/cli_sql.py @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +SQL commands for Paimon CLI. + +This module provides SQL query capability via pypaimon-rust + DataFusion. +""" + +import os +import re +import sys +import time + +_PAIMON_BANNER = r""" + ____ _ + / __ \____ _(_)___ ___ ____ ____ + / /_/ / __ `/ / __ `__ \/ __ \/ __ \ + / ____/ /_/ / / / / / / / /_/ / / / / +/_/ \__,_/_/_/ /_/ /_/\____/_/ /_/ + + Powered by pypaimon-rust + DataFusion + Type 'help' for usage, 'exit' to quit. +""" + +_USE_PATTERN = re.compile( + r"^\s*use\s+(\w+)\s*;?\s*$", + re.IGNORECASE, +) + +_HISTORY_FILE = os.path.expanduser("~/.paimon_history") +_HISTORY_MAX_LENGTH = 1000 + +_PROMPT = "paimon> " +_CONTINUATION_PROMPT = " > " + + +def _get_readline(): + """Get the best available readline module. + + Prefers gnureadline (full GNU readline) over the built-in readline + (which is libedit on macOS and may have limited features). + """ + try: + import gnureadline as readline + return readline + except ImportError: + pass + try: + import readline + return readline + except ImportError: + return None + + +def _is_libedit(rl): + """Check if the readline module is backed by libedit (macOS default).""" + return hasattr(rl, '__doc__') and rl.__doc__ and 'libedit' in rl.__doc__ + + +def _setup_readline(): + """Enable readline for arrow key support and persistent command history.""" + rl = _get_readline() + if rl is None: + return + rl.set_history_length(_HISTORY_MAX_LENGTH) + if not os.path.exists(_HISTORY_FILE): + return + if _is_libedit(rl): + # libedit escapes spaces as \040 in history files, so we load manually. + with open(_HISTORY_FILE, 'r', encoding='utf-8') as f: + for line in f: + line = line.rstrip('\n') + if line: + rl.add_history(line) + else: + rl.read_history_file(_HISTORY_FILE) + + +def _save_history(): + """Save readline history to file.""" + rl = _get_readline() + if rl is None: + return + try: + if _is_libedit(rl): + # Write history manually to avoid libedit's \040 escaping. + length = rl.get_current_history_length() + lines = [] + for i in range(1, length + 1): + item = rl.get_history_item(i) + if item is not None: + lines.append(item) + # Keep only the last N entries + lines = lines[-_HISTORY_MAX_LENGTH:] + with open(_HISTORY_FILE, 'w', encoding='utf-8') as f: + for line in lines: + f.write(line + '\n') + else: + rl.write_history_file(_HISTORY_FILE) + except OSError: + pass + + +def cmd_sql(args): + """ + Execute the 'sql' command. + + Runs a SQL query against Paimon tables, or starts an interactive SQL REPL. + + Args: + args: Parsed command line arguments. + """ + from pypaimon.cli.cli import load_catalog_config + + config_path = args.config + config = load_catalog_config(config_path) + + try: + from pypaimon.sql.sql_context import SQLContext + catalog_options = {str(k): str(v) for k, v in config.items()} + ctx = SQLContext() + ctx.register_catalog("paimon", catalog_options) + ctx.set_current_catalog("paimon") + ctx.set_current_database("default") + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + query = args.query + if query: + _execute_query(ctx, query, getattr(args, 'format', 'table')) + else: + _interactive_repl(ctx, getattr(args, 'format', 'table')) + + +def _execute_query(ctx, query, output_format): + """Execute a single SQL query and print the result.""" + try: + table = ctx.sql(query) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + _print_table(table, output_format) + + +def _print_table(table, output_format, elapsed=None): + """Print a PyArrow Table in the requested format.""" + df = table.to_pandas() + if output_format == 'json': + import json + print(json.dumps(df.to_dict(orient='records'), ensure_ascii=False)) + else: + print(df.to_string(index=False)) + + if elapsed is not None: + row_count = len(df) + print(f"({row_count} {'row' if row_count == 1 else 'rows'} in {elapsed:.2f}s)") + + +def _read_multiline_query(): + """Read a potentially multi-line SQL query, terminated by ';'. + + Returns the complete query string, or None on EOF/interrupt. + """ + lines = [] + prompt = _PROMPT + while True: + try: + line = input(prompt) + except (EOFError, KeyboardInterrupt): + if lines: + # Cancel current multi-line input + print() + return "" + return None + + lines.append(line) + joined = "\n".join(lines).strip() + + if not joined: + lines.clear() + prompt = _PROMPT + continue + + # Single-word commands that don't need ';' + lower = joined.lower().rstrip(';').strip() + if lower in ('exit', 'quit', 'help'): + return joined + + # USE command doesn't strictly need ';' + if _USE_PATTERN.match(joined): + return joined + + # For SQL statements, wait for ';' + if joined.endswith(';'): + return joined + + prompt = _CONTINUATION_PROMPT + + +def _handle_use(ctx, match): + """Handle USE command.""" + database = match.group(1) + try: + ctx.set_current_database(database) + print(f"Using database '{database}'.") + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + + +def _interactive_repl(ctx, output_format): + """Run an interactive SQL REPL.""" + _setup_readline() + print(_PAIMON_BANNER) + + try: + while True: + query = _read_multiline_query() + if query is None: + print("\nBye!") + break + + if not query: + continue + + lower = query.lower().rstrip(';').strip() + if lower in ('exit', 'quit'): + print("Bye!") + break + if lower == 'help': + _print_help() + continue + + # Handle USE + use_match = _USE_PATTERN.match(query) + if use_match: + _handle_use(ctx, use_match) + continue + + try: + start = time.time() + table = ctx.sql(query) + elapsed = time.time() - start + _print_table(table, output_format, elapsed) + print() + except Exception as e: + print(f"Error: {e}\n", file=sys.stderr) + finally: + _save_history() + + +def _print_help(): + """Print REPL help information.""" + print(""" +Commands: + USE ; Switch the default database + SHOW DATABASES; List all databases + SHOW TABLES; List tables in the current database + SELECT ... FROM ; Execute a SQL query + exit / quit Exit the REPL + +Table reference: +
Table in the current default database + .
Table in a specific database + +Tips: + - SQL statements end with ';' and can span multiple lines + - Arrow keys are supported for line editing and command history + - Command history is saved across sessions (~/.paimon_history) +""") + + +def add_sql_subcommand(subparsers): + """ + Add the sql subcommand to the main parser. + + Args: + subparsers: The subparsers object from the main argument parser. + """ + sql_parser = subparsers.add_parser( + 'sql', + help='Execute SQL queries on Paimon tables (requires pypaimon-rust)' + ) + sql_parser.add_argument( + 'query', + nargs='?', + default=None, + help='SQL query to execute. If omitted, starts interactive REPL.' + ) + sql_parser.add_argument( + '--format', '-f', + type=str, + choices=['table', 'json'], + default='table', + help='Output format: table (default) or json' + ) + sql_parser.set_defaults(func=cmd_sql) diff --git a/paimon-python/pypaimon/sql/__init__.py b/paimon-python/pypaimon/sql/__init__.py new file mode 100644 index 000000000000..e059c025a60f --- /dev/null +++ b/paimon-python/pypaimon/sql/__init__.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +__all__ = ['SQLContext'] + + +def __getattr__(name): + if name == "SQLContext": + from pypaimon.sql.sql_context import SQLContext + return SQLContext + raise AttributeError("module 'pypaimon.sql' has no attribute {}".format(name)) diff --git a/paimon-python/pypaimon/sql/sql_context.py b/paimon-python/pypaimon/sql/sql_context.py new file mode 100644 index 000000000000..67afa30f34e8 --- /dev/null +++ b/paimon-python/pypaimon/sql/sql_context.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa + + +class SQLContext: + """SQL query context for Paimon tables. + + Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon tables. + Supports registering multiple catalogs for cross-catalog queries. + + Example:: + + from pypaimon.sql import SQLContext + + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"}) + ctx.set_current_catalog("paimon") + ctx.set_current_database("default") + result = ctx.sql("SELECT * FROM my_table") + """ + + def __init__(self): + try: + from datafusion import SessionContext + except ImportError: + raise ImportError( + "datafusion is required for SQL query support. " + "Install it with: pip install pypaimon[sql]" + ) + + self._ctx = SessionContext() + + def register_catalog(self, name, options): + """Register a Paimon catalog as a DataFusion catalog provider. + + Args: + name: The catalog name to register under. + options: A dict of catalog options (e.g. {"warehouse": "/path/to/warehouse"}). + """ + try: + from pypaimon_rust.datafusion import PaimonCatalog + except ImportError: + raise ImportError( + "pypaimon-rust is required for SQL query support. " + "Install it with: pip install pypaimon[sql]" + ) + + paimon_catalog = PaimonCatalog(options) + self._ctx.register_catalog_provider(name, paimon_catalog) + + def set_current_catalog(self, catalog_name: str): + """Set the default catalog for SQL queries.""" + self._ctx.sql(f"SET datafusion.catalog.default_catalog = '{catalog_name}'") + + def set_current_database(self, database: str): + """Set the default database for SQL queries.""" + self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'") + + def sql(self, query: str) -> pa.Table: + """Execute a SQL query and return the result as a PyArrow Table.""" + df = self._ctx.sql(query) + batches = df.collect() + if not batches: + return pa.Table.from_batches([], schema=df.schema()) + return pa.Table.from_batches(batches) diff --git a/paimon-python/pypaimon/tests/sql_context_test.py b/paimon-python/pypaimon/tests/sql_context_test.py new file mode 100644 index 000000000000..931052fae13d --- /dev/null +++ b/paimon-python/pypaimon/tests/sql_context_test.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory + + +WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse") + + +class SQLContextTest(unittest.TestCase): + + _table_created = False + + def _create_catalog(self): + return CatalogFactory.create({"warehouse": WAREHOUSE}) + + def _create_sql_context(self): + from pypaimon.sql.sql_context import SQLContext + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": WAREHOUSE}) + ctx.set_current_catalog("paimon") + ctx.set_current_database("default") + return ctx + + @classmethod + def setUpClass(cls): + """Create the test table once before all tests in this class.""" + from pypaimon import Schema, CatalogFactory + from pypaimon.schema.data_types import DataField, AtomicType + + catalog = CatalogFactory.create({"warehouse": WAREHOUSE}) + try: + catalog.create_database("default", ignore_if_exists=True) + except Exception: + pass + + identifier = "default.sql_test_table" + + # Drop existing table to ensure clean state + catalog.drop_table(identifier, ignore_if_not_exists=True) + + schema = Schema( + fields=[ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + ], + primary_keys=[], + partition_keys=[], + options={}, + comment="", + ) + catalog.create_table(identifier, schema, ignore_if_exists=False) + + table = catalog.get_table(identifier) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + try: + pa_table = pa.table({ + "id": pa.array([1, 2, 3], type=pa.int32()), + "name": pa.array(["alice", "bob", "carol"], type=pa.string()), + }) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + finally: + table_write.close() + table_commit.close() + + @classmethod + def tearDownClass(cls): + """Clean up the test table after all tests.""" + catalog = CatalogFactory.create({"warehouse": WAREHOUSE}) + catalog.drop_table("default.sql_test_table", ignore_if_not_exists=True) + + def test_sql_returns_table(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id") + self.assertIsInstance(table, pa.Table) + self.assertEqual(table.num_rows, 3) + self.assertEqual(table.column("id").to_pylist(), [1, 2, 3]) + self.assertEqual(table.column("name").to_pylist(), ["alice", "bob", "carol"]) + + def test_sql_to_pandas(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id") + df = table.to_pandas() + self.assertEqual(len(df), 3) + self.assertListEqual(list(df.columns), ["id", "name"]) + + def test_sql_with_filter(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1 ORDER BY id") + self.assertEqual(table.num_rows, 2) + self.assertEqual(table.column("id").to_pylist(), [2, 3]) + + def test_sql_with_empty_result(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4 ORDER BY id") + self.assertIsInstance(table, pa.Table) + self.assertEqual(table.num_rows, 0) + self.assertEqual(table.schema.names, ["id", "name"]) + + def test_sql_with_aggregation(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table") + self.assertEqual(table.column("cnt").to_pylist(), [3]) + + def test_sql_two_part_reference(self): + ctx = self._create_sql_context() + table = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table") + self.assertEqual(table.column("cnt").to_pylist(), [3]) + + def test_import_error_without_pypaimon_rust(self): + """register_catalog should raise ImportError when pypaimon-rust is missing.""" + import unittest.mock as mock + import builtins + original_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "pypaimon_rust.datafusion" or name == "pypaimon_rust": + raise ImportError("No module named 'pypaimon_rust'") + return original_import(name, *args, **kwargs) + + from pypaimon.sql.sql_context import SQLContext + ctx = SQLContext() + with mock.patch("builtins.__import__", side_effect=mock_import): + with self.assertRaises(ImportError) as cm: + ctx.register_catalog("paimon", {"warehouse": WAREHOUSE}) + self.assertIn("pypaimon-rust", str(cm.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/setup.py b/paimon-python/setup.py index 876d7f7ce9df..3b0b6833f457 100644 --- a/paimon-python/setup.py +++ b/paimon-python/setup.py @@ -70,6 +70,10 @@ def read_requirements(): 'pylance>=0.20,<1; python_version>="3.9"', 'pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"' ], + 'sql': [ + 'pypaimon-rust; python_version>="3.10"', + 'datafusion>=52; python_version>="3.10"', + ], }, description="Apache Paimon Python API", long_description=long_description,