Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -7756,7 +7756,11 @@ def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
result,
"family_id",
)
preserved_spm_unit_ids = self._normalized_complete_existing_group_ids(
# SPM unit ids from the source are trustworthy and must survive synthesis
# even when partially missing (a single missing id must not collapse the
# whole frame to one SPM unit per household). Tax-unit ids, by contrast,
# are reconstructed, not preserved (see _build_policyengine_tax_units).
preserved_spm_unit_ids = self._preserve_present_group_ids(
result,
"spm_unit_id",
)
Expand Down Expand Up @@ -7923,6 +7927,50 @@ def _normalized_complete_existing_group_ids(
)
return raw_numeric.astype(np.int64).rename(id_column)

def _preserve_present_group_ids(
self,
persons: pd.DataFrame,
id_column: str,
) -> pd.Series | None:
"""Preserve existing per-person unit ids where present, regenerating only
the rows that are missing one.

Unlike :meth:`_normalized_complete_existing_group_ids` (which discards the
whole column if *any* id is missing), this keeps the authoritative
grouping for every row that carries an id and collapses rows with a
missing id into a single per-household fallback unit. Used for SPM units,
whose source ids are trustworthy and should survive synthesis even when
partially missing (otherwise a single missing id drops the whole frame to
one SPM unit per household). Returns ``None`` only when the column is
absent or entirely empty.
"""
if id_column not in persons.columns:
return None
raw_ids = persons[id_column]
present = raw_ids.notna()
if not present.any():
return None
hh = persons["household_id"]
codes = pd.Series(-1, index=persons.index, dtype=np.int64)
# Present rows: stable unit code from factorizing (household_id, real id).
present_key = pd.MultiIndex.from_frame(
pd.DataFrame({"hh": hh[present], "id": raw_ids[present].astype("string")})
)
codes.loc[present] = pd.factorize(present_key, sort=False)[0]
if (~present).any():
# Missing rows fold into their household's first present unit so they
# never fabricate a spurious unit; households with no present id at
# all get one fresh fallback unit each.
first_present = codes[present].groupby(hh[present]).first()
miss_hh = hh[~present]
fallback = miss_hh.map(first_present)
no_present = fallback.isna()
if no_present.any():
fresh = pd.factorize(miss_hh[no_present], sort=False)[0]
fallback.loc[no_present] = fresh + (int(codes.max()) + 1)
codes.loc[~present] = fallback.astype(np.int64).to_numpy()
return codes.rename(id_column)

def _collapse_group_table(
self,
persons: pd.DataFrame,
Expand Down
103 changes: 103 additions & 0 deletions tests/pipelines/test_us_spm_preservation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""SPM-unit-id preservation (#113).

The authoritative SPM unit ids carried by the source records are eCPS-quality
(~1.04 units/household). Synthesis can leave some records without an id; the old
all-or-nothing logic discarded the whole column on any missing id, collapsing to
one SPM unit per household (~1.00). These tests pin the preserve-present-fill-
missing behavior that keeps the authoritative structure.
"""

from __future__ import annotations

import numpy as np
import pandas as pd

from microplex_us.pipelines.us import USMicroplexPipeline


def _pipe() -> USMicroplexPipeline:
return USMicroplexPipeline()


def test_preserve_present_keeps_distinct_units():
# Two SPM units in one household, all ids present -> both preserved.
persons = pd.DataFrame(
{"household_id": [1, 1, 1, 1], "spm_unit_id": [10, 10, 11, 11]}
)
out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id")
assert out.nunique() == 2
assert out.iloc[0] == out.iloc[1]
assert out.iloc[2] == out.iloc[3]
assert out.iloc[0] != out.iloc[2]


def test_missing_rows_fold_into_present_unit_not_new():
# A missing id in a household that has present ids must NOT fabricate a unit.
persons = pd.DataFrame(
{"household_id": [1, 1, 1], "spm_unit_id": [10.0, 10.0, np.nan]}
)
out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id")
assert out.nunique() == 1 # folded into the present unit, not split to 2


def test_fully_missing_household_gets_one_fallback_when_others_present():
persons = pd.DataFrame(
{"household_id": [1, 1, 2, 2], "spm_unit_id": [10.0, 10.0, np.nan, np.nan]}
)
out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id")
assert out.iloc[0] == out.iloc[1] # hh1 preserved
assert out.iloc[2] == out.iloc[3] # hh2 -> one fallback unit
assert out.iloc[0] != out.iloc[2]


def test_all_missing_returns_none():
persons = pd.DataFrame({"household_id": [1, 1], "spm_unit_id": [np.nan, np.nan]})
# Entirely empty column -> None so the caller regenerates from scratch.
assert _pipe()._preserve_present_group_ids(persons, "spm_unit_id") is None


def test_assign_family_and_spm_preserves_partial_spm():
# End to end: a partially-missing SPM column keeps the present structure
# rather than collapsing to one unit per household.
persons = pd.DataFrame(
{
"person_id": [1, 2, 3, 4, 5],
"household_id": [1, 1, 1, 2, 2],
"relationship_to_head": [0, 1, 2, 0, 2],
"spm_unit_id": [10.0, 10.0, 11.0, np.nan, np.nan],
}
)
out = _pipe()._assign_family_and_spm_units(persons)
per_hh = out.groupby("household_id")["spm_unit_id"].nunique()
assert int(per_hh.loc[1]) == 2 # two present units preserved
assert int(per_hh.loc[2]) == 1 # fully-missing household -> one fallback


def test_missing_row_folds_without_merging_distinct_present_units():
# The trickiest folding case: a household with TWO present SPM units AND a
# missing row. The missing row must fold into one existing unit WITHOUT
# merging the two genuinely-distinct present units or fabricating a third.
persons = pd.DataFrame(
{
"household_id": [1, 1, 1, 1, 1],
"spm_unit_id": [10.0, 10.0, 11.0, 11.0, np.nan],
}
)
out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id")
assert out.nunique() == 2 # the two present units stay distinct
assert out.iloc[0] == out.iloc[1] # unit 10
assert out.iloc[2] == out.iloc[3] # unit 11
assert out.iloc[0] != out.iloc[2]
assert out.iloc[4] == out.iloc[0] # missing row folded into the first unit


def test_preserve_present_aligns_under_non_default_index():
# A non-default / shuffled index must not misalign the missing-row assignment.
persons = pd.DataFrame(
{"household_id": [1, 1, 2, 2], "spm_unit_id": [10.0, np.nan, np.nan, 20.0]},
index=[100, 5, 42, 7],
)
out = _pipe()._preserve_present_group_ids(persons, "spm_unit_id")
assert out.loc[100] == out.loc[5] # hh1: present 10 + missing fold together
assert out.loc[42] == out.loc[7] # hh2: missing + present 20 fold together
assert out.loc[100] != out.loc[42] # distinct households stay distinct
Loading