diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index bdbcbb6..201f5cf 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -7756,7 +7756,11 @@ def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame: result, "family_id", ) - preserved_spm_unit_ids = self._normalized_complete_existing_group_ids( + # SPM unit ids from the source are trustworthy and must survive synthesis + # even when partially missing (a single missing id must not collapse the + # whole frame to one SPM unit per household). Tax-unit ids, by contrast, + # are reconstructed, not preserved (see _build_policyengine_tax_units). + preserved_spm_unit_ids = self._preserve_present_group_ids( result, "spm_unit_id", ) @@ -7923,6 +7927,50 @@ def _normalized_complete_existing_group_ids( ) return raw_numeric.astype(np.int64).rename(id_column) + def _preserve_present_group_ids( + self, + persons: pd.DataFrame, + id_column: str, + ) -> pd.Series | None: + """Preserve existing per-person unit ids where present, regenerating only + the rows that are missing one. + + Unlike :meth:`_normalized_complete_existing_group_ids` (which discards the + whole column if *any* id is missing), this keeps the authoritative + grouping for every row that carries an id and collapses rows with a + missing id into a single per-household fallback unit. Used for SPM units, + whose source ids are trustworthy and should survive synthesis even when + partially missing (otherwise a single missing id drops the whole frame to + one SPM unit per household). Returns ``None`` only when the column is + absent or entirely empty. + """ + if id_column not in persons.columns: + return None + raw_ids = persons[id_column] + present = raw_ids.notna() + if not present.any(): + return None + hh = persons["household_id"] + codes = pd.Series(-1, index=persons.index, dtype=np.int64) + # Present rows: stable unit code from factorizing (household_id, real id). + present_key = pd.MultiIndex.from_frame( + pd.DataFrame({"hh": hh[present], "id": raw_ids[present].astype("string")}) + ) + codes.loc[present] = pd.factorize(present_key, sort=False)[0] + if (~present).any(): + # Missing rows fold into their household's first present unit so they + # never fabricate a spurious unit; households with no present id at + # all get one fresh fallback unit each. + first_present = codes[present].groupby(hh[present]).first() + miss_hh = hh[~present] + fallback = miss_hh.map(first_present) + no_present = fallback.isna() + if no_present.any(): + fresh = pd.factorize(miss_hh[no_present], sort=False)[0] + fallback.loc[no_present] = fresh + (int(codes.max()) + 1) + codes.loc[~present] = fallback.astype(np.int64).to_numpy() + return codes.rename(id_column) + def _collapse_group_table( self, persons: pd.DataFrame, diff --git a/tests/pipelines/test_us_spm_preservation.py b/tests/pipelines/test_us_spm_preservation.py new file mode 100644 index 0000000..002343d --- /dev/null +++ b/tests/pipelines/test_us_spm_preservation.py @@ -0,0 +1,103 @@ +"""SPM-unit-id preservation (#113). + +The authoritative SPM unit ids carried by the source records are eCPS-quality +(~1.04 units/household). Synthesis can leave some records without an id; the old +all-or-nothing logic discarded the whole column on any missing id, collapsing to +one SPM unit per household (~1.00). These tests pin the preserve-present-fill- +missing behavior that keeps the authoritative structure. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from microplex_us.pipelines.us import USMicroplexPipeline + + +def _pipe() -> USMicroplexPipeline: + return USMicroplexPipeline() + + +def test_preserve_present_keeps_distinct_units(): + # Two SPM units in one household, all ids present -> both preserved. + persons = pd.DataFrame( + {"household_id": [1, 1, 1, 1], "spm_unit_id": [10, 10, 11, 11]} + ) + out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id") + assert out.nunique() == 2 + assert out.iloc[0] == out.iloc[1] + assert out.iloc[2] == out.iloc[3] + assert out.iloc[0] != out.iloc[2] + + +def test_missing_rows_fold_into_present_unit_not_new(): + # A missing id in a household that has present ids must NOT fabricate a unit. + persons = pd.DataFrame( + {"household_id": [1, 1, 1], "spm_unit_id": [10.0, 10.0, np.nan]} + ) + out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id") + assert out.nunique() == 1 # folded into the present unit, not split to 2 + + +def test_fully_missing_household_gets_one_fallback_when_others_present(): + persons = pd.DataFrame( + {"household_id": [1, 1, 2, 2], "spm_unit_id": [10.0, 10.0, np.nan, np.nan]} + ) + out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id") + assert out.iloc[0] == out.iloc[1] # hh1 preserved + assert out.iloc[2] == out.iloc[3] # hh2 -> one fallback unit + assert out.iloc[0] != out.iloc[2] + + +def test_all_missing_returns_none(): + persons = pd.DataFrame({"household_id": [1, 1], "spm_unit_id": [np.nan, np.nan]}) + # Entirely empty column -> None so the caller regenerates from scratch. + assert _pipe()._preserve_present_group_ids(persons, "spm_unit_id") is None + + +def test_assign_family_and_spm_preserves_partial_spm(): + # End to end: a partially-missing SPM column keeps the present structure + # rather than collapsing to one unit per household. + persons = pd.DataFrame( + { + "person_id": [1, 2, 3, 4, 5], + "household_id": [1, 1, 1, 2, 2], + "relationship_to_head": [0, 1, 2, 0, 2], + "spm_unit_id": [10.0, 10.0, 11.0, np.nan, np.nan], + } + ) + out = _pipe()._assign_family_and_spm_units(persons) + per_hh = out.groupby("household_id")["spm_unit_id"].nunique() + assert int(per_hh.loc[1]) == 2 # two present units preserved + assert int(per_hh.loc[2]) == 1 # fully-missing household -> one fallback + + +def test_missing_row_folds_without_merging_distinct_present_units(): + # The trickiest folding case: a household with TWO present SPM units AND a + # missing row. The missing row must fold into one existing unit WITHOUT + # merging the two genuinely-distinct present units or fabricating a third. + persons = pd.DataFrame( + { + "household_id": [1, 1, 1, 1, 1], + "spm_unit_id": [10.0, 10.0, 11.0, 11.0, np.nan], + } + ) + out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id") + assert out.nunique() == 2 # the two present units stay distinct + assert out.iloc[0] == out.iloc[1] # unit 10 + assert out.iloc[2] == out.iloc[3] # unit 11 + assert out.iloc[0] != out.iloc[2] + assert out.iloc[4] == out.iloc[0] # missing row folded into the first unit + + +def test_preserve_present_aligns_under_non_default_index(): + # A non-default / shuffled index must not misalign the missing-row assignment. + persons = pd.DataFrame( + {"household_id": [1, 1, 2, 2], "spm_unit_id": [10.0, np.nan, np.nan, 20.0]}, + index=[100, 5, 42, 7], + ) + out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id") + assert out.loc[100] == out.loc[5] # hh1: present 10 + missing fold together + assert out.loc[42] == out.loc[7] # hh2: missing + present 20 fold together + assert out.loc[100] != out.loc[42] # distinct households stay distinct