Skip to content

Commit 034a036

Browse files
committed
Use context managers for CPS raw stores
1 parent 228a02f commit 034a036

2 files changed

Lines changed: 120 additions & 128 deletions

File tree

policyengine_us_data/datasets/cps/cps.py

Lines changed: 113 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,10 @@ def generate(self):
5353
cps = {}
5454

5555
ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household")
56-
raw_data = self.raw_cps(require=True).load()
57-
try:
56+
with self.raw_cps(require=True).load() as raw_data:
5857
person, tax_unit, family, spm_unit, household = [
5958
raw_data[entity] for entity in ENTITIES
6059
]
61-
finally:
62-
raw_data.close()
6360

6461
logging.info("Adding ID variables")
6562
add_id_variables(cps, person, tax_unit, family, spm_unit, household)
@@ -807,14 +804,10 @@ def add_previous_year_income(self, cps: h5py.File) -> None:
807804
)
808805
return
809806

810-
cps_current_year_data = self.raw_cps(require=True).load()
811-
try:
812-
cps_previous_year_data = self.previous_year_raw_cps(require=True).load()
813-
except Exception:
814-
cps_current_year_data.close()
815-
raise
816-
817-
try:
807+
with (
808+
self.raw_cps(require=True).load() as cps_current_year_data,
809+
self.previous_year_raw_cps(require=True).load() as cps_previous_year_data,
810+
):
818811
cps_previous_year = cps_previous_year_data.person.set_index(
819812
cps_previous_year_data.person.PERIDNUM
820813
)
@@ -846,9 +839,6 @@ def add_previous_year_income(self, cps: h5py.File) -> None:
846839
"I_SEVAL",
847840
]
848841
]
849-
finally:
850-
cps_previous_year_data.close()
851-
cps_current_year_data.close()
852842
joined_data["previous_year_income_available"] = (
853843
~joined_data.employment_income_last_year.isna()
854844
& ~joined_data.self_employment_income_last_year.isna()
@@ -1775,12 +1765,9 @@ def add_tips(self, cps: h5py.File):
17751765
# Get is_married from raw CPS data (A_MARITL codes: 1,2 = married)
17761766
# Note: is_married in policyengine-us is Family-level, but we need
17771767
# person-level for imputation models
1778-
raw_data = self.raw_cps(require=True).load()
1779-
try:
1768+
with self.raw_cps(require=True).load() as raw_data:
17801769
raw_person = raw_data["person"]
17811770
cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values
1782-
finally:
1783-
raw_data.close()
17841771

17851772
cps["is_under_18"] = cps.age < 18
17861773
cps["is_under_6"] = cps.age < 6
@@ -1940,51 +1927,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None:
19401927
cps_data = self.load_dataset()
19411928

19421929
# Access raw CPS for additional variables
1943-
raw_data_instance = self.raw_cps(require=True)
1944-
raw_data = raw_data_instance.load()
1945-
person_data = raw_data.person
1946-
1947-
# Preprocess the CPS for imputation
1948-
lengths = {k: len(v) for k, v in cps_data.items()}
1949-
var_len = cps_data["person_household_id"].shape[0]
1950-
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
1951-
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
1952-
agg_data["interest_dividend_income"] = np.sum(
1953-
[
1954-
agg_data["taxable_interest_income"],
1955-
agg_data["tax_exempt_interest_income"],
1956-
agg_data["qualified_dividend_income"],
1957-
agg_data["non_qualified_dividend_income"],
1958-
],
1959-
axis=0,
1960-
)
1961-
agg_data["social_security_pension_income"] = np.sum(
1962-
[
1963-
agg_data["tax_exempt_private_pension_income"],
1964-
agg_data["taxable_private_pension_income"],
1965-
agg_data["social_security_retirement"],
1966-
],
1967-
axis=0,
1968-
)
1969-
1970-
agg = (
1971-
agg_data.groupby("person_household_id")[
1930+
with self.raw_cps(require=True).load() as raw_data:
1931+
person_data = raw_data.person
1932+
1933+
# Preprocess the CPS for imputation
1934+
lengths = {k: len(v) for k, v in cps_data.items()}
1935+
var_len = cps_data["person_household_id"].shape[0]
1936+
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
1937+
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
1938+
agg_data["interest_dividend_income"] = np.sum(
19721939
[
1973-
"employment_income",
1974-
"interest_dividend_income",
1975-
"social_security_pension_income",
1940+
agg_data["taxable_interest_income"],
1941+
agg_data["tax_exempt_interest_income"],
1942+
agg_data["qualified_dividend_income"],
1943+
agg_data["non_qualified_dividend_income"],
1944+
],
1945+
axis=0,
1946+
)
1947+
agg_data["social_security_pension_income"] = np.sum(
1948+
[
1949+
agg_data["tax_exempt_private_pension_income"],
1950+
agg_data["taxable_private_pension_income"],
1951+
agg_data["social_security_retirement"],
1952+
],
1953+
axis=0,
1954+
)
1955+
1956+
agg = (
1957+
agg_data.groupby("person_household_id")[
1958+
[
1959+
"employment_income",
1960+
"interest_dividend_income",
1961+
"social_security_pension_income",
1962+
]
19761963
]
1977-
]
1978-
.sum()
1979-
.rename(
1980-
columns={
1981-
"employment_income": "household_employment_income",
1982-
"interest_dividend_income": "household_interest_dividend_income",
1983-
"social_security_pension_income": "household_social_security_pension_income",
1984-
}
1964+
.sum()
1965+
.rename(
1966+
columns={
1967+
"employment_income": "household_employment_income",
1968+
"interest_dividend_income": "household_interest_dividend_income",
1969+
"social_security_pension_income": "household_social_security_pension_income",
1970+
}
1971+
)
1972+
.reset_index()
19851973
)
1986-
.reset_index()
1987-
)
19881974

19891975
def create_scf_reference_person_mask(cps_data, raw_person_data):
19901976
"""
@@ -2094,78 +2080,77 @@ def determine_reference_person(group):
20942080

20952081
return all_persons_data["is_scf_reference_person"].values
20962082

2097-
mask = create_scf_reference_person_mask(cps_data, person_data)
2098-
mask_len = mask.shape[0]
2083+
mask = create_scf_reference_person_mask(cps_data, person_data)
2084+
mask_len = mask.shape[0]
20992085

2100-
cps_data = {
2101-
var: data[mask] if data.shape[0] == mask_len else data
2102-
for var, data in cps_data.items()
2103-
}
2104-
2105-
CPS_RACE_MAPPING = {
2106-
1: 1, # White only -> WHITE
2107-
2: 2, # Black only -> BLACK/AFRICAN-AMERICAN
2108-
3: 5, # American Indian, Alaskan Native only -> OTHER
2109-
4: 4, # Asian only -> ASIAN
2110-
5: 5, # Hawaiian/Pacific Islander only -> OTHER
2111-
6: 5, # White-Black -> OTHER
2112-
7: 5, # White-AI -> OTHER
2113-
8: 5, # White-Asian -> OTHER
2114-
9: 3, # White-HP -> HISPANIC
2115-
10: 5, # Black-AI -> OTHER
2116-
11: 5, # Black-Asian -> OTHER
2117-
12: 3, # Black-HP -> HISPANIC
2118-
13: 5, # AI-Asian -> OTHER
2119-
14: 5, # AI-HP -> OTHER
2120-
15: 3, # Asian-HP -> HISPANIC
2121-
16: 5, # White-Black-AI -> OTHER
2122-
17: 5, # White-Black-Asian -> OTHER
2123-
18: 5, # White-Black-HP -> OTHER
2124-
19: 5, # White-AI-Asian -> OTHER
2125-
20: 5, # White-AI-HP -> OTHER
2126-
21: 5, # White-Asian-HP -> OTHER
2127-
22: 5, # Black-AI-Asian -> OTHER
2128-
23: 5, # White-Black-AI-Asian -> OTHER
2129-
24: 5, # White-AI-Asian-HP -> OTHER
2130-
25: 5, # Other 3 race comb. -> OTHER
2131-
26: 5, # Other 4 or 5 race comb. -> OTHER
2132-
}
2133-
2134-
# Apply the mapping to recode the race values
2135-
cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"])
2086+
cps_data = {
2087+
var: data[mask] if data.shape[0] == mask_len else data
2088+
for var, data in cps_data.items()
2089+
}
21362090

2137-
lengths = {k: len(v) for k, v in cps_data.items()}
2138-
var_len = cps_data["person_household_id"].shape[0]
2139-
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
2140-
receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
2091+
CPS_RACE_MAPPING = {
2092+
1: 1, # White only -> WHITE
2093+
2: 2, # Black only -> BLACK/AFRICAN-AMERICAN
2094+
3: 5, # American Indian, Alaskan Native only -> OTHER
2095+
4: 4, # Asian only -> ASIAN
2096+
5: 5, # Hawaiian/Pacific Islander only -> OTHER
2097+
6: 5, # White-Black -> OTHER
2098+
7: 5, # White-AI -> OTHER
2099+
8: 5, # White-Asian -> OTHER
2100+
9: 3, # White-HP -> HISPANIC
2101+
10: 5, # Black-AI -> OTHER
2102+
11: 5, # Black-Asian -> OTHER
2103+
12: 3, # Black-HP -> HISPANIC
2104+
13: 5, # AI-Asian -> OTHER
2105+
14: 5, # AI-HP -> OTHER
2106+
15: 3, # Asian-HP -> HISPANIC
2107+
16: 5, # White-Black-AI -> OTHER
2108+
17: 5, # White-Black-Asian -> OTHER
2109+
18: 5, # White-Black-HP -> OTHER
2110+
19: 5, # White-AI-Asian -> OTHER
2111+
20: 5, # White-AI-HP -> OTHER
2112+
21: 5, # White-Asian-HP -> OTHER
2113+
22: 5, # Black-AI-Asian -> OTHER
2114+
23: 5, # White-Black-AI-Asian -> OTHER
2115+
24: 5, # White-AI-Asian-HP -> OTHER
2116+
25: 5, # Other 3 race comb. -> OTHER
2117+
26: 5, # Other 4 or 5 race comb. -> OTHER
2118+
}
21412119

2142-
receiver_data = receiver_data.merge(
2143-
agg[
2144-
[
2145-
"person_household_id",
2146-
"household_employment_income",
2147-
"household_interest_dividend_income",
2148-
"household_social_security_pension_income",
2149-
]
2150-
],
2151-
on="person_household_id",
2152-
how="left",
2153-
)
2154-
receiver_data.drop("employment_income", axis=1, inplace=True)
2120+
# Apply the mapping to recode the race values
2121+
cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"])
2122+
2123+
lengths = {k: len(v) for k, v in cps_data.items()}
2124+
var_len = cps_data["person_household_id"].shape[0]
2125+
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
2126+
receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
2127+
2128+
receiver_data = receiver_data.merge(
2129+
agg[
2130+
[
2131+
"person_household_id",
2132+
"household_employment_income",
2133+
"household_interest_dividend_income",
2134+
"household_social_security_pension_income",
2135+
]
2136+
],
2137+
on="person_household_id",
2138+
how="left",
2139+
)
2140+
receiver_data.drop("employment_income", axis=1, inplace=True)
21552141

2156-
receiver_data.rename(
2157-
columns={
2158-
"household_employment_income": "employment_income",
2159-
"household_interest_dividend_income": "interest_dividend_income",
2160-
"household_social_security_pension_income": "social_security_pension_income",
2161-
},
2162-
inplace=True,
2163-
)
2142+
receiver_data.rename(
2143+
columns={
2144+
"household_employment_income": "employment_income",
2145+
"household_interest_dividend_income": "interest_dividend_income",
2146+
"household_social_security_pension_income": "social_security_pension_income",
2147+
},
2148+
inplace=True,
2149+
)
21642150

2165-
# Add is_married variable for household heads based on raw person data
2166-
reference_persons = person_data[mask]
2167-
receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values
2168-
raw_data.close()
2151+
# Add is_married variable for household heads based on raw person data
2152+
reference_persons = person_data[mask]
2153+
receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values
21692154

21702155
# Impute auto loan balance from the SCF
21712156
from policyengine_us_data.datasets.scf.scf import SCF_2022

tests/unit/datasets/test_cps_file_handles.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ def __init__(self, person: pd.DataFrame):
1111
self.person = person
1212
self.closed = False
1313

14+
def __enter__(self):
15+
return self
16+
17+
def __exit__(self, exc_type, exc, tb):
18+
self.close()
19+
return False
20+
1421
def close(self):
1522
self.closed = True
1623

0 commit comments

Comments
 (0)