Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ policyengine = [
"microimpute==1.15.1 ; python_full_version >= '3.12' and python_full_version < '3.15'",
"policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'",
"spm-calculator>=0.3.1",
# Standalone tax-unit construction engine (the extraction of eCPS's
# tax-unit logic), used by the PolicyEngine pipeline to reconstruct tax
# units from CPS-like person frames (issue #113).
"microunit>=0.1.0",
]

[project.urls]
Expand Down
204 changes: 204 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -6262,12 +6262,198 @@ def _build_policyengine_tax_units(
tax_units = pd.DataFrame(tax_unit_rows)
return tax_units, person_rows

# Raw CPS ASEC columns that ``microunit.construct_tax_units`` consumes to
# reconstruct tax units. ``microunit`` is the standalone extraction of
# eCPS's tax-unit logic (issue #113); it is *source-agnostic* and expects
# this normalized CPS-like contract rather than microplex's collapsed
# ``relationship_to_head`` coding. We only delegate when the frame actually
# carries these columns, so the delegation is behavior-preserving on
# today's frames (which do not carry them) and only becomes active once an
# upstream change threads CPS columns through to entity construction.
_MICROUNIT_REQUIRED_CPS_COLUMNS = (
"PH_SEQ",
"A_LINENO",
"A_AGE",
"A_MARITL",
"A_SPOUSE",
"PEPAR1",
"PEPAR2",
"A_EXPRRP",
)

def _build_policyengine_tax_units_via_microunit(
self,
persons: pd.DataFrame,
*,
start_tax_unit_id: int = 0,
) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None:
"""Reconstruct tax units by delegating to ``microunit`` (issue #113).

This is the *reconstruction-from-scratch* path. The authoritative-ID
path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is
handled separately and is never routed here.

Delegation only happens when ``persons`` carries the raw CPS columns in
:attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely
depends on marital status (``A_MARITL``), spouse/parent line pointers
(``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode
(``A_EXPRRP``); microplex's reconstruction-stage frame collapses
relationship into a 0/1/2/3 coding and drops the pointer columns, so a
*faithful* mapping is not possible from that frame. Rather than fabricate
microunit inputs (which would silently change behavior), we return
``None`` when the columns are absent and let the caller fall back to the
legacy role-flag reconstruction.

.. warning::
``microunit`` *is* eCPS's tax-unit construction. Routing microplex
through it makes microplex's constructed tax units **converge toward
eCPS's**. Any loss change from enabling this delegation is an
*entity-convergence* effect and must be interpreted as such, not as
a quality improvement. See issue #113.

Returns the same ``(tax_units, person_rows, households)`` triple shape as
:meth:`_build_policyengine_tax_units_from_role_flags`, or ``None`` to
defer to the caller's fallback.
"""
if "person_id" not in persons.columns or "household_id" not in persons.columns:
return None
if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns):
return None

# Imported lazily to match this module's optional-dependency convention:
# ``microunit`` ships in the ``policyengine`` extra, and the base test
# suite must import this module without that extra installed.
from microunit import POLICYENGINE_MODE, construct_tax_units

# microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the
# index keeps row order so the returned per-person TAX_ID and role align
# positionally back onto person_rows.
person_rows = persons.reset_index(drop=True).copy()
person_assignments, tax_unit = construct_tax_units(
person_rows.copy(),
year=self._microunit_reference_year(person_rows),
mode=POLICYENGINE_MODE,
)

tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce")
person_rows["tax_unit_id"] = (
tax_id.to_numpy() + int(start_tax_unit_id)
).astype(np.int64)
# microunit emits an authoritative per-person HEAD/SPOUSE/DEPENDENT role;
# use it directly for the filer/dependent split rather than re-deriving
# from the (possibly absent) collapsed relationship_to_head coding.
person_rows["_microunit_role"] = [
self._decode_microunit_bytes(role)
for role in person_assignments["tax_unit_role_input"].tolist()
]

# microunit emits the canonical filing-status vocabulary already, but
# normalize defensively so this path can never diverge from the legacy
# paths if microunit ever changes its spelling/casing.
filing_status_by_unit = {
int(row_tax_id) + int(start_tax_unit_id): (
self._normalize_policyengine_filing_status(
self._decode_microunit_bytes(filing_value)
)
)
for row_tax_id, filing_value in zip(
tax_unit["TAX_ID"].tolist(),
tax_unit["filing_status_input"].tolist(),
strict=True,
)
}

tax_unit_rows: list[dict[str, Any]] = []
for unit_id, unit_persons in person_rows.groupby("tax_unit_id", sort=False):
ordered = unit_persons.sort_values(
["_microunit_role", "age", "person_id"],
ascending=[True, False, True],
).reset_index(drop=True)
is_filer = ordered["_microunit_role"].isin(["HEAD", "SPOUSE"])
filer_ids = [
int(person_id) for person_id in ordered.loc[is_filer, "person_id"]
]
dependent_ids = [
int(person_id) for person_id in ordered.loc[~is_filer, "person_id"]
]
if not filer_ids:
filer_ids = [int(ordered.iloc[0]["person_id"])]
dependent_ids = [
int(person_id)
for person_id in ordered["person_id"].tolist()
if int(person_id) not in filer_ids
]
tax_unit_rows.append(
{
"tax_unit_id": int(unit_id),
"household_id": int(ordered.iloc[0]["household_id"]),
"filing_status": filing_status_by_unit.get(int(unit_id), "SINGLE"),
"member_ids": [
int(person_id) for person_id in ordered["person_id"]
],
"filer_ids": filer_ids,
"dependent_ids": dependent_ids,
"n_dependents": len(dependent_ids),
"total_income": float(
pd.to_numeric(ordered.get("income", 0.0), errors="coerce")
.fillna(0.0)
.sum()
),
"tax_liability": 0.0,
**self._aggregate_policyengine_tax_unit_input_columns(ordered),
}
)

if not tax_unit_rows:
return None

households = set(person_rows["household_id"].drop_duplicates().tolist())
person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore")
return pd.DataFrame(tax_unit_rows), person_rows, households

@staticmethod
def _decode_microunit_bytes(value: Any) -> str:
"""Decode a ``microunit`` bytes-typed status/role into an upper string."""
if isinstance(value, bytes):
return value.decode()
return str(value)

def _microunit_reference_year(self, persons: pd.DataFrame) -> int:
"""Year passed to ``microunit`` for its dependency income thresholds.

Prefers an explicit ``year``/``tax_year`` column when the frame carries
one; otherwise falls back to the pipeline's configured reference year so
the only year-dependent behavior (the qualifying-relative gross income
limit) matches the rest of the pipeline. TODO(#113): thread the dataset
reference year through entity construction explicitly.
"""
for column in ("year", "tax_year"):
if column in persons.columns:
values = pd.to_numeric(persons[column], errors="coerce").dropna()
if not values.empty:
return int(values.iloc[0])
configured = getattr(self.config, "reference_year", None)
if configured is not None:
return int(configured)
return 2024

def _build_policyengine_tax_units_from_role_flags(
self,
persons: pd.DataFrame,
*,
start_tax_unit_id: int = 0,
) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None:
# Issue #113: when the frame carries microunit's CPS-style input
# columns, delegate the reconstruction to microunit. Otherwise fall
# through to the legacy role-flag reconstruction below (the current
# production path, since these columns are not yet threaded through).
microunit_result = self._build_policyengine_tax_units_via_microunit(
persons,
start_tax_unit_id=start_tax_unit_id,
)
if microunit_result is not None:
return microunit_result

role_columns = {
"is_tax_unit_head",
"is_tax_unit_spouse",
Expand Down Expand Up @@ -7326,6 +7512,16 @@ def _coerce_policyengine_status_code(self, value: Any) -> int | None:
return int(numeric)

def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
"""Assign family and SPM units, preserving authoritative IDs when present.

NOT delegated to ``microunit`` in this pass (issue #113). At the pinned
commit ``microunit.units.spm.assign_spm_partition`` is documented as "a
conservative adapter, not yet the full Census-parity constructor" and is
not exported from microunit's public API, and microunit has no
family-unit constructor. The authoritative-ID fast path is preserved
here. TODO(#113): delegate once microunit grows a Census-parity
SPM/family constructor.
"""
result = persons.copy()
preserved_family_ids = self._normalized_complete_existing_group_ids(
result,
Expand Down Expand Up @@ -7399,6 +7595,14 @@ def _assign_marital_units(
self,
persons: pd.DataFrame,
) -> pd.DataFrame:
"""Assign marital units, preserving authoritative IDs when present.

NOT delegated to ``microunit`` in this pass (issue #113): microunit does
not construct marital units at the pinned commit (filing status is its
only marital-related output; there is no ``construct_marital_units``).
The authoritative-ID fast path is preserved here. TODO(#113): revisit if
microunit grows marital-unit support.
"""
result = persons.copy()
preserved_marital_unit_ids = self._normalized_complete_existing_group_ids(
result,
Expand Down
Loading
Loading