Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d958b2a
feat: add Microsoft Fabric support to elementary CLI
devin-ai-integration[bot] Mar 7, 2026
ac95204
test: add Fabric target to test profiles
devin-ai-integration[bot] Mar 7, 2026
6a737e8
feat: add SQL Server support to elementary CLI
devin-ai-integration[bot] Mar 7, 2026
b54a7a1
test: add SQL Server target to test profiles
devin-ai-integration[bot] Mar 7, 2026
62a36e6
ci: add Fabric and SQL Server to CI workflows
devin-ai-integration[bot] Mar 7, 2026
ba52254
ci: temporarily limit matrix to fabric+sqlserver, use ELE-5282 branch…
devin-ai-integration[bot] Mar 7, 2026
cfc9b68
fix: use cloud auth (ServicePrincipal) for fabric profile, keep sqlse…
devin-ai-integration[bot] Mar 7, 2026
7b51cd7
fix: replace || with concat() for T-SQL compatibility in CLI dbt macros
devin-ai-integration[bot] Mar 7, 2026
1739662
fix: replace LIMIT with TOP for T-SQL compatibility in CLI dbt macros
devin-ai-integration[bot] Mar 7, 2026
65d032e
ci: trigger rebuild with updated dbt-data-reliability dispatches
devin-ai-integration[bot] Mar 7, 2026
f51dfc9
fix: add fabric__get_test_results to avoid nested CTE issue in T-SQL
devin-ai-integration[bot] Mar 7, 2026
dff9bc3
fix: move ORDER BY out of create_temp_table query for T-SQL compatibi…
devin-ai-integration[bot] Mar 7, 2026
699bee3
fix: quote reserved keyword aliases (schema, database) for T-SQL comp…
devin-ai-integration[bot] Mar 7, 2026
1f23903
ci: restore all adapters in CI matrix after fabric+sqlserver pass
devin-ai-integration[bot] Mar 7, 2026
a86d990
fix: use adapter dispatch for reserved keyword aliases instead of dou…
devin-ai-integration[bot] Mar 7, 2026
613d8ed
fix: use dbt.concat() cross-DB macro instead of concat() for Redshift…
devin-ai-integration[bot] Mar 7, 2026
d566f90
refactor: add edr_quote_identifier macro to eliminate fabric__ duplic…
devin-ai-integration[bot] Mar 7, 2026
40c8044
refactor: use elementary.is_tsql() instead of target.type checks
devin-ai-integration[bot] Mar 7, 2026
39a3c8d
refactor: address PR review feedback - extract _TSQL_TRANSIENT, remov…
devin-ai-integration[bot] Mar 8, 2026
5461453
chore: update dbt-data-reliability to merged master commit, remove CI…
devin-ai-integration[bot] Mar 8, 2026
6989836
ci: use docker compose --wait for sqlserver healthcheck instead of ma…
devin-ai-integration[bot] Mar 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test-all-warehouses.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ jobs:
trino,
dremio,
spark,
fabric,
sqlserver,
]
uses: ./.github/workflows/test-warehouse.yml
with:
Expand Down
18 changes: 17 additions & 1 deletion .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ on:
- duckdb
- trino
- dremio
- fabric
- sqlserver
elementary-ref:
type: string
required: false
Expand Down Expand Up @@ -159,6 +161,12 @@ jobs:
run: |
docker compose up -d --build --wait spark-thrift

- name: Start SQL Server
if: inputs.warehouse-type == 'sqlserver'
working-directory: ${{ env.E2E_DBT_PROJECT_DIR }}
run: |
docker compose up -d --wait sqlserver

- name: Setup Python
uses: actions/setup-python@v5
with:
Expand All @@ -168,6 +176,14 @@ jobs:
if: inputs.warehouse-type == 'spark'
run: sudo apt-get install -y python3-dev libsasl2-dev gcc

- name: Install ODBC driver for SQL Server
if: inputs.warehouse-type == 'fabric' || inputs.warehouse-type == 'sqlserver'
run: |
curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc > /dev/null
curl -fsSL https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list > /dev/null
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev

- name: Install dbt
run: >
pip install
Expand All @@ -188,7 +204,7 @@ jobs:
# This enables caching the seeded database state between runs.
IS_DOCKER=false
case "${{ inputs.warehouse-type }}" in
postgres|clickhouse|trino|dremio|duckdb|spark) IS_DOCKER=true ;;
postgres|clickhouse|trino|dremio|duckdb|spark|sqlserver) IS_DOCKER=true ;;
esac

if [ "$IS_DOCKER" = "true" ]; then
Expand Down
11 changes: 11 additions & 0 deletions elementary/clients/dbt/transient_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@
"service unavailable",
)

_TSQL_TRANSIENT: Tuple[str, ...] = (
"connection timed out",
"could not connect to the server",
"ssl syscall error",
"communication link failure",
"tcp provider: an existing connection was forcibly closed",
"login timeout expired",
)

_ADAPTER_PATTERNS: Dict[str, Tuple[str, ...]] = {
"bigquery": (
# Streaming-buffer delay after a streaming insert.
Expand Down Expand Up @@ -109,6 +118,8 @@
# DuckDB runs in-process; transient errors are rare.
# Common patterns (connection reset, broken pipe) are in _COMMON.
),
"fabric": _TSQL_TRANSIENT,
"sqlserver": _TSQL_TRANSIENT,
}

# Pre-computed union of all adapter-specific patterns for the fallback path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@
select distinct
failed_tests.alert_id,
{# Generate elementary unique id which is used to identify between tests, and set it as alert_class_id #}
coalesce(failed_tests.test_unique_id, 'None') || '.' || coalesce(failed_tests.column_name, 'None') || '.' || coalesce(failed_tests.sub_type, 'None') as alert_class_id,
{{ dbt.concat(["coalesce(failed_tests.test_unique_id, 'None')", "'.'", "coalesce(failed_tests.column_name, 'None')", "'.'", "coalesce(failed_tests.sub_type, 'None')"]) }} as alert_class_id,
case
when failed_tests.test_type = 'schema_change' then failed_tests.test_unique_id
{# In old versions of elementary, elementary_test_results doesn't contain test_short_name, so we use dbt_test short_name. #}
when tests.short_name = 'dimension_anomalies' then failed_tests.test_unique_id
else coalesce(failed_tests.test_unique_id, 'None') || '.' || coalesce(failed_tests.column_name, 'None') || '.' || coalesce(failed_tests.sub_type, 'None')
else {{ dbt.concat(["coalesce(failed_tests.test_unique_id, 'None')", "'.'", "coalesce(failed_tests.column_name, 'None')", "'.'", "coalesce(failed_tests.sub_type, 'None')"]) }}
end as elementary_unique_id,
failed_tests.data_issue_id,
failed_tests.test_execution_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
when elementary_test_results.test_type = 'schema_change' then elementary_test_results.test_unique_id
{# In old versions of elementary, elementary_test_results doesn't contain test_short_name, so we use dbt_test short_name. #}
when dbt_tests.short_name = 'dimension_anomalies' then elementary_test_results.test_unique_id
else coalesce(elementary_test_results.test_unique_id, 'None') || '.' || coalesce(nullif(elementary_test_results.column_name, ''), 'None') || '.' || coalesce(elementary_test_results.test_sub_type, 'None')
else {{ dbt.concat(["coalesce(elementary_test_results.test_unique_id, 'None')", "'.'", "coalesce(nullif(elementary_test_results.column_name, ''), 'None')", "'.'", "coalesce(elementary_test_results.test_sub_type, 'None')"]) }}
end as elementary_unique_id,
elementary_test_results.invocation_id,
elementary_test_results.data_issue_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
select
unique_id,
name,
schema_name as schema,
schema_name as {{ elementary_cli.edr_quote_identifier('schema') }},
tags,
owner as owners,
database_name as database
database_name as {{ elementary_cli.edr_quote_identifier('database') }}
from dbt_models
{% if exclude_elementary %}
where package_name != 'elementary'
Expand All @@ -31,10 +31,10 @@
unique_id,
name,
source_name,
schema_name AS schema,
schema_name AS {{ elementary_cli.edr_quote_identifier('schema') }},
tags,
owner AS owners,
database_name as database
database_name as {{ elementary_cli.edr_quote_identifier('database') }}
from dbt_sources
{% if exclude_elementary %}
where package_name != 'elementary'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@
{% macro duckdb__get_adapter_unique_id() %}
{{ return(target.path) }}
{% endmacro %}

{% macro fabric__get_adapter_unique_id() %}
{{ return(target.server) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
{% endif %}

{% set get_pkg_version_query %}
{% if elementary.is_tsql() %}
select top 1 * from {{ invocations_relation }} order by generated_at desc
{% else %}
select * from {{ invocations_relation }} order by generated_at desc limit 1
{% endif %}
{% endset %}
{% set result = elementary.run_query(get_pkg_version_query) %}
{% if not result %}
Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/dbt_project/macros/get_models_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
unique_id,
invocation_id,
name,
schema_name as schema,
schema_name as {{ elementary_cli.edr_quote_identifier('schema') }},
status,
case
when status != 'success' then 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
),

test_invocation as (
select distinct invocation_id, detected_at
select {% if elementary.is_tsql() %}top 1{% endif %} distinct invocation_id, detected_at
from elementary_test_results
{% if invocation_id %}
where invocation_id = {{ "'" ~ invocation_id ~ "'" }}
{% elif invocation_max_time %}
where detected_at < {{ "'" ~ invocation_max_time ~ "'" }}
{% endif %}
order by detected_at desc
limit 1
{% if not elementary.is_tsql() %}limit 1{% endif %}
)

{% if invocations_relation %}
Expand Down
148 changes: 95 additions & 53 deletions elementary/monitor/dbt_project/macros/get_test_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,44 @@
{{ return(adapter.dispatch('get_test_results', 'elementary_cli')(days_back, invocations_per_test, disable_passed_test_metrics)) }}
{%- endmacro -%}

{#
Shared post-processing helper: filters tests by meta, attaches sample data.
Called by both default__ and fabric__ dispatches to avoid duplicating the
Jinja processing loop.
#}
{%- macro _process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status) -%}
{% set test_results = [] %}
{% set tests = elementary.agate_to_dicts(test_results_agate) %}

{% set filtered_tests = [] %}
{% for test in tests %}
{% set test_meta = fromjson(test.meta) %}
{% if test_meta.get("elementary", {}).get("include", true) %}
{% do filtered_tests.append(test) %}
{% endif %}
{% endfor %}

{% for test in filtered_tests %}
{% set test_rows_sample = none %}
{% if test.invocations_rank_index == 1 %}
{% set test_type = test.test_type %}
{% set test_params = fromjson(test.test_params) %}
{% set status = test.status | lower %}

{%- if (test_type == 'dbt_test' and status in ['fail', 'warn']) or (test_type != 'dbt_test' and status in elementary_tests_allowlist_status) -%}
{% set test_rows_sample = elementary_cli.get_test_rows_sample(test.result_rows, test_result_rows_agate.get(test.id)) %}
{%- endif -%}
{% else %}
{# Null out test_results_query for non-latest invocations to save memory #}
{% do test.update({"test_results_query": none}) %}
{% endif %}
{# Adding sample data to test results #}
{% do test.update({"sample_data": test_rows_sample}) %}
{% do test_results.append(test) %}
{%- endfor -%}
{% do return(test_results) %}
{%- endmacro -%}

{%- macro default__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{% set elementary_tests_allowlist_status = ['fail', 'warn'] if disable_passed_test_metrics else ['fail', 'warn', 'pass'] %}
{% set select_test_results %}
Expand Down Expand Up @@ -59,8 +97,6 @@
order by test_results.elementary_unique_id, test_results.invocations_rank_index desc
{%- endset -%}

{% set test_results = [] %}

{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
{% set ordered_test_results_relation = elementary.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_test_results) %}

Expand All @@ -79,32 +115,63 @@
{% if not elementary.has_temp_table_support() %}
{% do elementary.fully_drop_relation(ordered_test_results_relation) %}
{% endif %}
{% set tests = elementary.agate_to_dicts(test_results_agate) %}

{% set filtered_tests = [] %}
{% for test in tests %}
{% set test_meta = fromjson(test.meta) %}
{% if test_meta.get("elementary", {}).get("include", true) %}
{% do filtered_tests.append(test) %}
{% endif %}
{% endfor %}
{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{%- endmacro -%}

{% for test in filtered_tests %}
{% set test_rows_sample = none %}
{% if test.invocations_rank_index == 1 %}
{% set test_type = test.test_type %}
{% set test_params = fromjson(test.test_params) %}
{% set status = test.status | lower %}
{%- macro fabric__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
{#
T-SQL does not allow nested CTEs (WITH inside WITH).
current_tests_run_results_query already starts with WITH, so we
cannot wrap it in another CTE. Instead we materialise it into a
temp table first, then build ordered_test_results on top.
Note: sqlserver adapter inherits from fabric, so this dispatch
covers both fabric and sqlserver targets automatically.
#}
{% set elementary_tests_allowlist_status = ['fail', 'warn'] if disable_passed_test_metrics else ['fail', 'warn', 'pass'] %}

{%- if (test_type == 'dbt_test' and status in ['fail', 'warn']) or (test_type != 'dbt_test' and status in elementary_tests_allowlist_status) -%}
{% set test_rows_sample = elementary_cli.get_test_rows_sample(test.result_rows, test_result_rows_agate.get(test.id)) %}
{%- endif -%}
{% endif %}
{# Adding sample data to test results #}
{% do test.update({"sample_data": test_rows_sample}) %}
{% do test_results.append(test) %}
{%- endfor -%}
{% do return(test_results) %}
{# Step 1 – materialise the base test-results query into a temp table #}
{% set base_query %}
{{ elementary_cli.current_tests_run_results_query(days_back=days_back) }}
{% endset %}

{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
{% set base_relation = elementary.create_temp_table(elementary_database, elementary_schema, 'test_results_base', base_query) %}

{# Step 2 – build ordered_test_results from the materialised base (no nested CTE) #}
{% set select_ordered %}
select
*,
{{ elementary.edr_datediff(elementary.edr_cast_as_timestamp('detected_at'), elementary.edr_current_timestamp(), 'day') }} as days_diff,
row_number() over (partition by elementary_unique_id order by {{elementary.edr_cast_as_timestamp('detected_at')}} desc) as invocations_rank_index
from {{ base_relation }}
{% endset %}

{% set ordered_relation = elementary.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_ordered) %}

{# Step 3 – final query: filter by invocations_per_test #}
{# ORDER BY must be here, not inside create_temp_table — T-SQL forbids ORDER BY in views/subqueries without TOP #}
{% set test_results_agate_sql %}
select *
from {{ ordered_relation }}
where invocations_rank_index <= {{ invocations_per_test }}
order by elementary_unique_id, invocations_rank_index desc
{% endset %}

{% set valid_ids_query %}
select distinct id
from {{ ordered_relation }}
where invocations_rank_index = 1
{% endset %}

{% set test_results_agate = elementary.run_query(test_results_agate_sql) %}
{% set test_result_rows_agate = elementary_cli.get_result_rows_agate(days_back, valid_ids_query) %}

{# Clean up intermediate tables #}
{% do elementary.fully_drop_relation(base_relation) %}
{% do elementary.fully_drop_relation(ordered_relation) %}

{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{%- endmacro -%}

{%- macro clickhouse__get_test_results(days_back = 7, invocations_per_test = 720, disable_passed_test_metrics = false) -%}
Expand Down Expand Up @@ -162,7 +229,7 @@
CASE
WHEN etr.test_type = 'schema_change' THEN etr.test_unique_id
WHEN dt.short_name = 'dimension_anomalies' THEN etr.test_unique_id
ELSE coalesce(etr.test_unique_id, 'None') || '.' || coalesce(nullif(etr.column_name, ''), 'None') || '.' || coalesce(etr.test_sub_type, 'None')
ELSE {{ dbt.concat(["coalesce(etr.test_unique_id, 'None')", "'.'", "coalesce(nullif(etr.column_name, ''), 'None')", "'.'", "coalesce(etr.test_sub_type, 'None')"]) }}
END AS elementary_unique_id,
etr.detected_at,
etr.database_name,
Expand Down Expand Up @@ -239,31 +306,6 @@
{% if not elementary.has_temp_table_support() %}
{% do elementary.fully_drop_relation(ordered_test_results_relation) %}
{% endif %}
{% set tests = elementary.agate_to_dicts(test_results_agate) %}

{% set filtered_tests = [] %}
{% for test in tests %}
{% set test_meta = fromjson(test.meta) %}
{% if test_meta.get("elementary", {}).get("include", true) %}
{% do filtered_tests.append(test) %}
{% endif %}
{% endfor %}

{% for test in filtered_tests %}
{% set test_rows_sample = none %}
{% if test.invocations_rank_index == 1 %}
{% set test_type = test.test_type %}
{% set test_params = fromjson(test.test_params) %}
{% set status = test.status | lower %}

{%- if (test_type == 'dbt_test' and status in ['fail', 'warn']) or (test_type != 'dbt_test' and status in elementary_tests_allowlist_status) -%}
{% set test_rows_sample = elementary_cli.get_test_rows_sample(test.result_rows, test_result_rows_agate.get(test.id)) %}
{%- endif -%}
{% endif %}
{# Adding sample data to test results #}
{% do test.update({"sample_data": test_rows_sample}) %}
{% do test_results.append(test) %}
{%- endfor -%}

{% do return(test_results) %}
{%- endmacro -%}
{% do return(elementary_cli._process_raw_test_results(test_results_agate, test_result_rows_agate, elementary_tests_allowlist_status)) %}
{%- endmacro -%}
4 changes: 4 additions & 0 deletions elementary/monitor/dbt_project/macros/test_conn.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
{% set elementary_model_relation = api.Relation.create(elementary_database, elementary_schema, "dbt_models") %}
{% set query %}
{% if elementary.is_tsql() %}
select top 10 * from {{ elementary_model_relation }}
{% else %}
select * from {{ elementary_model_relation }} limit 10
{% endif %}
{% endset %}
{% do elementary.run_query(query) %}
{% endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{%- macro edr_quote_identifier(identifier) -%}
{%- if elementary.is_tsql() -%}
[{{ identifier }}]
{%- else -%}
{{ identifier }}
{%- endif -%}
{%- endmacro -%}
8 changes: 5 additions & 3 deletions elementary/monitor/dbt_project/package-lock.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
packages:
- package: dbt-labs/dbt_utils
- name: dbt_utils
package: dbt-labs/dbt_utils
version: 0.8.6
- git: https://github.com/elementary-data/dbt-data-reliability.git
revision: ab21363935c42490a60a779557ba99bed96b754c
sha1_hash: 661e08669f6a005c445ed631a333de316b58d57f
name: elementary
revision: 534afc63c75d28b87d7cbd3b222dd3ea9a980f7b
sha1_hash: cb18b7df65415901187dcf469dcd377e56c0dc70
Loading
Loading