Skip to content

Commit 5af3429

Browse files
[PECOBLR-2461] Flip TestMstMetadata: server now blocks Thrift Get* in MST
The TestMstMetadata tests were written speculatively expecting the Thrift Get{Catalogs,Schemas,Tables,Columns} RPCs to succeed inside an active multi-statement transaction. The server's MST guard now rejects these RPCs with TRANSACTION_NOT_SUPPORTED ("... is not supported within a multi-statement transaction."), matching the intentional design where MST exposes a small allowlist of SQL forms. Flip the four `*_in_mst` tests into `*_blocked` tests asserting the expected DatabaseError. The two `*_non_transactional_after_concurrent_*` freshness tests are deleted outright — their entire premise ("Thrift RPCs bypass MST and see concurrent DDL") is no longer valid, so there is nothing meaningful left to assert. Note: the rejection happens at the MST guard before reaching the txn, so the active transaction remains usable after a blocked Get* (unlike SQL forms in TestMstBlockedSql, where the failure aborts the txn). The shared helper documents this distinction. Verified against dogfood: 4 passed in 33.67s. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 4089f3f commit 5af3429

1 file changed

Lines changed: 56 additions & 126 deletions

File tree

tests/e2e/test_transactions.py

Lines changed: 56 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import pytest
3030

3131
import databricks.sql as sql
32+
from databricks.sql.exc import DatabaseError
3233

3334
logger = logging.getLogger(__name__)
3435

@@ -472,150 +473,78 @@ def test_unsupported_isolation_level_rejected(self, mst_conn_params):
472473

473474

474475
class TestMstMetadata:
475-
"""Metadata RPCs inside active transactions.
476-
477-
Python uses Thrift RPCs for cursor.columns, cursor.tables, etc. These
478-
RPCs bypass MST context and return non-transactional data — they see
479-
concurrent DDL changes that the transaction shouldn't see.
476+
"""Thrift metadata RPCs inside active transactions.
477+
478+
Python's cursor.columns/tables/schemas/catalogs map to Thrift
479+
Get{Columns,Tables,Schemas,Catalogs} RPCs. The server's MST guard
480+
rejects these RPCs with a "not supported within a multi-statement
481+
transaction" error. The rejection happens before reaching the txn,
482+
so the active transaction itself remains usable (unlike the SQL
483+
forms in TestMstBlockedSql, which abort the txn).
480484
"""
481485

482-
def test_cursor_columns_in_mst(
483-
self, mst_conn_params, mst_table, mst_catalog, mst_schema
484-
):
485-
fq_table, table_name = mst_table
486-
with sql.connect(**mst_conn_params) as conn:
487-
conn.autocommit = False
488-
with conn.cursor() as cursor:
489-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
490-
cursor.columns(
491-
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
492-
)
493-
columns = cursor.fetchall()
494-
assert len(columns) > 0
495-
conn.rollback()
486+
def _assert_metadata_rpc_blocked(self, mst_conn_params, fq_table, rpc):
487+
"""Assert the metadata RPC raises inside an active MST.
496488
497-
def test_cursor_tables_in_mst(
498-
self, mst_conn_params, mst_table, mst_catalog, mst_schema
499-
):
500-
fq_table, table_name = mst_table
501-
with sql.connect(**mst_conn_params) as conn:
502-
conn.autocommit = False
503-
with conn.cursor() as cursor:
504-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
505-
cursor.tables(
506-
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
507-
)
508-
tables = cursor.fetchall()
509-
assert len(tables) > 0
510-
conn.rollback()
489+
The Thrift Get* RPCs are rejected by the MST gateway before reaching
490+
the transaction, so the txn itself remains usable — only the RPC
491+
call fails.
511492
512-
def test_cursor_schemas_in_mst(self, mst_conn_params, mst_table, mst_catalog):
513-
fq_table, _ = mst_table
493+
`rpc` is a callable that takes a cursor and invokes the metadata
494+
RPC under test.
495+
"""
514496
with sql.connect(**mst_conn_params) as conn:
515497
conn.autocommit = False
516498
with conn.cursor() as cursor:
517-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
518-
cursor.schemas(catalog_name=mst_catalog)
519-
schemas = cursor.fetchall()
520-
assert len(schemas) > 0
521-
conn.rollback()
499+
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'before_blocked')")
522500

523-
def test_cursor_catalogs_in_mst(self, mst_conn_params, mst_table):
524-
fq_table, _ = mst_table
525-
with sql.connect(**mst_conn_params) as conn:
526-
conn.autocommit = False
527-
with conn.cursor() as cursor:
528-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
529-
cursor.catalogs()
530-
catalogs = cursor.fetchall()
531-
assert len(catalogs) > 0
501+
with pytest.raises(DatabaseError, match="multi-statement transaction"):
502+
rpc(cursor)
532503
conn.rollback()
533504

534-
@pytest.mark.xdist_group(name="mst_freshness_columns")
535-
def test_cursor_columns_non_transactional_after_concurrent_ddl(
505+
def test_cursor_columns_blocked(
536506
self, mst_conn_params, mst_table, mst_catalog, mst_schema
537507
):
538-
"""Thrift cursor.columns() bypasses MST — sees concurrent ALTER TABLE."""
539508
fq_table, table_name = mst_table
540-
with sql.connect(**mst_conn_params) as conn:
541-
conn.autocommit = False
542-
with conn.cursor() as cursor:
543-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
544-
cursor.columns(
545-
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
546-
)
547-
before_cols = {row[3].lower() for row in cursor.fetchall()}
548-
549-
# External connection alters schema
550-
with sql.connect(**mst_conn_params) as ext_conn:
551-
with ext_conn.cursor() as ext_cursor:
552-
ext_cursor.execute(
553-
f"ALTER TABLE {fq_table} ADD COLUMN new_col STRING"
554-
)
555-
556-
# Re-read columns in same txn — Thrift RPC bypasses txn isolation,
557-
# so new_col IS visible (proves non-transactional behavior)
558-
with conn.cursor() as cursor:
559-
cursor.columns(
560-
catalog_name=mst_catalog, schema_name=mst_schema, table_name=table_name
561-
)
562-
after_cols = {row[3].lower() for row in cursor.fetchall()}
563-
564-
assert "new_col" in after_cols, (
565-
"Thrift cursor.columns() should see concurrent DDL "
566-
"(non-transactional behavior)"
567-
)
568-
assert before_cols != after_cols
569-
conn.rollback()
509+
self._assert_metadata_rpc_blocked(
510+
mst_conn_params,
511+
fq_table,
512+
lambda cursor: cursor.columns(
513+
catalog_name=mst_catalog,
514+
schema_name=mst_schema,
515+
table_name=table_name,
516+
),
517+
)
570518

571-
@pytest.mark.xdist_group(name="mst_freshness_tables")
572-
def test_cursor_tables_non_transactional_after_concurrent_create(
519+
def test_cursor_tables_blocked(
573520
self, mst_conn_params, mst_table, mst_catalog, mst_schema
574521
):
575-
"""Thrift cursor.tables() bypasses MST — sees concurrent CREATE TABLE."""
576-
fq_table, _ = mst_table
577-
new_table_name = _unique_table_name_raw("freshness_new_tbl")
578-
fq_new_table = f"{mst_catalog}.{mst_schema}.{new_table_name}"
579-
580-
try:
581-
with sql.connect(**mst_conn_params) as conn:
582-
conn.autocommit = False
583-
with conn.cursor() as cursor:
584-
cursor.execute(f"INSERT INTO {fq_table} VALUES (1, 'test')")
585-
cursor.tables(
586-
catalog_name=mst_catalog,
587-
schema_name=mst_schema,
588-
table_name=new_table_name,
589-
)
590-
assert len(cursor.fetchall()) == 0
522+
fq_table, table_name = mst_table
523+
self._assert_metadata_rpc_blocked(
524+
mst_conn_params,
525+
fq_table,
526+
lambda cursor: cursor.tables(
527+
catalog_name=mst_catalog,
528+
schema_name=mst_schema,
529+
table_name=table_name,
530+
),
531+
)
591532

592-
# External connection creates the table
593-
with sql.connect(**mst_conn_params) as ext_conn:
594-
with ext_conn.cursor() as ext_cursor:
595-
ext_cursor.execute(
596-
f"CREATE TABLE {fq_new_table} (id INT) USING DELTA "
597-
f"TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported')"
598-
)
533+
def test_cursor_schemas_blocked(self, mst_conn_params, mst_table, mst_catalog):
534+
fq_table, _ = mst_table
535+
self._assert_metadata_rpc_blocked(
536+
mst_conn_params,
537+
fq_table,
538+
lambda cursor: cursor.schemas(catalog_name=mst_catalog),
539+
)
599540

600-
# Re-read in same txn — should see the new table
601-
with conn.cursor() as cursor:
602-
cursor.tables(
603-
catalog_name=mst_catalog,
604-
schema_name=mst_schema,
605-
table_name=new_table_name,
606-
)
607-
assert len(cursor.fetchall()) > 0, (
608-
"Thrift cursor.tables() should see concurrent CREATE TABLE "
609-
"(non-transactional behavior)"
610-
)
611-
conn.rollback()
612-
finally:
613-
try:
614-
with sql.connect(**mst_conn_params) as conn:
615-
with conn.cursor() as cursor:
616-
cursor.execute(f"DROP TABLE IF EXISTS {fq_new_table}")
617-
except Exception as e:
618-
logger.warning(f"Failed to drop {fq_new_table}: {e}")
541+
def test_cursor_catalogs_blocked(self, mst_conn_params, mst_table):
542+
fq_table, _ = mst_table
543+
self._assert_metadata_rpc_blocked(
544+
mst_conn_params,
545+
fq_table,
546+
lambda cursor: cursor.catalogs(),
547+
)
619548

620549

621550
# ==================== D. BLOCKED SQL (MSTCheckRule) ====================
@@ -635,6 +564,7 @@ class TestMstBlockedSql:
635564
- SHOW TABLES, SHOW SCHEMAS, SHOW CATALOGS, SHOW FUNCTIONS
636565
- DESCRIBE QUERY, DESCRIBE TABLE EXTENDED
637566
- SELECT FROM information_schema
567+
- Thrift Get{Catalogs,Schemas,Tables,Columns} RPCs (see TestMstMetadata)
638568
639569
Allowed:
640570
- DESCRIBE TABLE (basic form)

0 commit comments

Comments
 (0)