Skip to content

Commit 5d0f3a3

Browse files
authored
Merge pull request #3405 from PolicyEngine/pr/simulation-telemetry-envelope
Generate simulation telemetry envelopes for Modal requests
2 parents c965fcb + 62a6096 commit 5d0f3a3

File tree

9 files changed

+161
-15
lines changed

9 files changed

+161
-15
lines changed

.github/workflows/pr.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ jobs:
2323
runs-on: ubuntu-latest
2424
steps:
2525
- uses: actions/checkout@v4
26+
with:
27+
fetch-depth: 0
28+
- name: Setup Python
29+
uses: actions/setup-python@v5
30+
with:
31+
python-version: "3.12"
32+
- name: Setup uv
33+
uses: astral-sh/setup-uv@v5
2634
- name: Check for changelog fragment
27-
run: |
28-
FRAGMENTS=$(find changelog.d -type f ! -name '.gitkeep' | wc -l)
29-
if [ "$FRAGMENTS" -eq 0 ]; then
30-
echo "::error::No changelog fragment found in changelog.d/"
31-
echo "Add one with: echo 'Description.' > changelog.d/\$(git branch --show-current).<type>.md"
32-
echo "Types: added, changed, fixed, removed, breaking"
33-
exit 1
34-
fi
35+
run: uv run --with "towncrier>=24.8.0" towncrier check --compare-with origin/master
3536
test_container_builds:
3637
name: Docker
3738
runs-on: ubuntu-latest

changelog.d/3394.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Attach PolicyEngine bundle metadata to economy results.

changelog.d/fixed/3394.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

policyengine_api/libs/simulation_api_modal.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class ModalSimulationExecution:
2222

2323
job_id: str
2424
status: str
25+
run_id: Optional[str] = None
2526
result: Optional[dict] = None
2627
error: Optional[str] = None
2728
policyengine_bundle: Optional[dict] = None
@@ -88,6 +89,7 @@ def run(self, payload: dict) -> ModalSimulationExecution:
8889
{
8990
"message": "Modal simulation job submitted",
9091
"job_id": data.get("job_id"),
92+
"run_id": data.get("run_id"),
9193
"status": data.get("status"),
9294
},
9395
severity="INFO",
@@ -98,12 +100,14 @@ def run(self, payload: dict) -> ModalSimulationExecution:
98100
status=data["status"],
99101
policyengine_bundle=data.get("policyengine_bundle"),
100102
resolved_app_name=data.get("resolved_app_name"),
103+
run_id=data.get("run_id"),
101104
)
102105

103106
except httpx.HTTPStatusError as e:
104107
logger.log_struct(
105108
{
106109
"message": f"Modal API HTTP error: {e.response.status_code}",
110+
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
107111
"response_text": e.response.text[:500],
108112
},
109113
severity="ERROR",
@@ -114,6 +118,7 @@ def run(self, payload: dict) -> ModalSimulationExecution:
114118
logger.log_struct(
115119
{
116120
"message": f"Modal API request error: {str(e)}",
121+
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
117122
},
118123
severity="ERROR",
119124
)
@@ -174,6 +179,7 @@ def get_execution_by_id(self, job_id: str) -> ModalSimulationExecution:
174179
return ModalSimulationExecution(
175180
job_id=job_id,
176181
status=data["status"],
182+
run_id=data.get("run_id"),
177183
result=data.get("result"),
178184
error=data.get("error"),
179185
policyengine_bundle=data.get("policyengine_bundle"),

policyengine_api/services/economy_service.py

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from policyengine.utils.data.datasets import get_default_dataset
2525
import json
2626
import datetime
27+
import hashlib
28+
import uuid
2729
from typing import Literal, Any, Optional, Annotated
2830
from dotenv import load_dotenv
2931
from pydantic import BaseModel
@@ -357,7 +359,6 @@ def _determine_impact_action(
357359
self,
358360
most_recent_impact: dict | None,
359361
) -> ImpactAction:
360-
361362
if not most_recent_impact:
362363
return ImpactAction.CREATE
363364

@@ -448,7 +449,6 @@ def _handle_computing_impact(
448449
setup_options: EconomicImpactSetupOptions,
449450
most_recent_impact: dict,
450451
) -> EconomicImpactResult:
451-
452452
execution = simulation_api.get_execution_by_id(
453453
most_recent_impact["execution_id"]
454454
)
@@ -484,17 +484,21 @@ def _handle_create_impact(
484484
data_version=setup_options.data_version,
485485
)
486486

487+
sim_params = sim_config.model_dump(mode="json")
488+
telemetry = self._build_simulation_telemetry(
489+
setup_options=setup_options,
490+
sim_config=sim_params,
491+
)
492+
487493
logger.log_struct(
488494
{
489495
"message": "Setting up sim API job",
496+
"run_id": telemetry["run_id"],
490497
**setup_options.model_dump(),
491498
}
492499
)
493500

494-
# Build params with metadata for Logfire tracing in the simulation API.
495-
# The _metadata field will be captured by the Logfire span before
496-
# SimulationOptions validation (which silently ignores extra fields).
497-
sim_params = sim_config.model_dump()
501+
# Preserve both legacy metadata and the new telemetry envelope.
498502
sim_params["_metadata"] = {
499503
"reform_policy_id": setup_options.reform_policy_id,
500504
"baseline_policy_id": setup_options.baseline_policy_id,
@@ -505,14 +509,17 @@ def _handle_create_impact(
505509
"dataset": setup_options.dataset,
506510
"resolved_app_name": setup_options.runtime_app_name,
507511
}
512+
sim_params["_telemetry"] = telemetry
508513

509514
sim_api_execution = simulation_api.run(sim_params)
510515
execution_id = simulation_api.get_execution_id(sim_api_execution)
516+
run_id = getattr(sim_api_execution, "run_id", None) or telemetry["run_id"]
511517

512518
progress_log = {
513519
**setup_options.model_dump(),
514520
"message": "Sim API job started",
515521
"execution_id": execution_id,
522+
"run_id": run_id,
516523
}
517524
logger.log_struct(progress_log, severity="INFO")
518525

@@ -759,6 +766,73 @@ def _setup_data(
759766
)
760767
raise
761768

769+
def _build_simulation_telemetry(
770+
self,
771+
setup_options: EconomicImpactSetupOptions,
772+
sim_config: dict[str, Any],
773+
) -> dict[str, Any]:
774+
simulation_kind, geography_type, geography_code = (
775+
self._classify_simulation_geography(
776+
country_id=setup_options.country_id,
777+
region=setup_options.region,
778+
)
779+
)
780+
781+
return {
782+
"run_id": str(uuid.uuid4()),
783+
"process_id": setup_options.process_id,
784+
"traceparent": self._get_current_traceparent(),
785+
"requested_at": datetime.datetime.now(datetime.UTC).isoformat(),
786+
"simulation_kind": simulation_kind,
787+
"geography_code": geography_code,
788+
"geography_type": geography_type,
789+
"config_hash": self._stable_config_hash(sim_config),
790+
"capture_mode": "disabled",
791+
}
792+
793+
def _classify_simulation_geography(
794+
self,
795+
country_id: str,
796+
region: str,
797+
) -> tuple[str, str, str]:
798+
if region == country_id:
799+
return "national", "national", country_id
800+
801+
if "/" not in region:
802+
return "other", "other", region
803+
804+
geography_type, geography_code = region.split("/", maxsplit=1)
805+
simulation_kind = (
806+
"district" if geography_type == "congressional_district" else geography_type
807+
)
808+
return simulation_kind, geography_type, geography_code
809+
810+
def _stable_config_hash(self, payload: dict[str, Any]) -> str:
811+
encoded = json.dumps(
812+
payload,
813+
sort_keys=True,
814+
separators=(",", ":"),
815+
default=str,
816+
).encode("utf-8")
817+
return f"sha256:{hashlib.sha256(encoded).hexdigest()}"
818+
819+
def _get_current_traceparent(self) -> str | None:
820+
try:
821+
from opentelemetry import trace
822+
except Exception:
823+
return None
824+
825+
span = trace.get_current_span()
826+
span_context = span.get_span_context()
827+
if not getattr(span_context, "is_valid", False):
828+
return None
829+
830+
trace_flags = int(getattr(span_context, "trace_flags", 0))
831+
return (
832+
f"00-{span_context.trace_id:032x}-"
833+
f"{span_context.span_id:016x}-{trace_flags:02x}"
834+
)
835+
762836
# Note: The following methods that interface with the ReformImpactsService
763837
# are written separately because the service relies upon mutating an original
764838
# 'computing' record to 'ok' or 'error' status, rather than creating a new record.

tests/fixtures/libs/simulation_api_modal.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
# Mock data constants
2020
MOCK_MODAL_JOB_ID = "fc-abc123xyz"
21+
MOCK_RUN_ID = "run-abc123xyz"
2122
MOCK_MODAL_BASE_URL = "https://test-modal-api.modal.run"
2223

2324
MOCK_SIMULATION_PAYLOAD = {
@@ -31,6 +32,15 @@
3132
"include_cliffs": False,
3233
}
3334

35+
MOCK_SIMULATION_PAYLOAD_WITH_TELEMETRY = {
36+
**MOCK_SIMULATION_PAYLOAD,
37+
"_telemetry": {
38+
"run_id": MOCK_RUN_ID,
39+
"process_id": "job_20250626120000_1234",
40+
"capture_mode": "disabled",
41+
},
42+
}
43+
3444
MOCK_SIMULATION_RESULT = {
3545
"poverty_impact": {"baseline": 0.12, "reform": 0.10},
3646
"budget_impact": {"baseline": 1000, "reform": 1200},
@@ -46,6 +56,7 @@
4656

4757
MOCK_SUBMIT_RESPONSE_SUCCESS = {
4858
"job_id": MOCK_MODAL_JOB_ID,
59+
"run_id": MOCK_RUN_ID,
4960
"status": MODAL_EXECUTION_STATUS_SUBMITTED,
5061
"poll_url": f"/jobs/{MOCK_MODAL_JOB_ID}",
5162
"country": "us",

tests/fixtures/services/economy_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
MOCK_MODAL_JOB_ID = "fc-test123xyz"
3232
MOCK_EXECUTION_ID = MOCK_MODAL_JOB_ID # Alias for test compatibility
33+
MOCK_RUN_ID = "run-test123xyz"
3334
MOCK_PROCESS_ID = "job_20250626120000_1234"
3435
MOCK_MODEL_VERSION = "1.2.3"
3536
MOCK_POLICYENGINE_VERSION = "3.4.0"
@@ -248,6 +249,7 @@ def create_mock_modal_execution(
248249
"""
249250
mock_execution = MagicMock()
250251
mock_execution.job_id = job_id
252+
mock_execution.run_id = MOCK_RUN_ID
251253
mock_execution.name = job_id # Alias for compatibility
252254
mock_execution.status = status
253255
mock_execution.result = result

tests/unit/libs/test_simulation_api_modal.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
MOCK_MODAL_JOB_ID,
2424
MOCK_MODAL_BASE_URL,
2525
MOCK_SIMULATION_PAYLOAD,
26+
MOCK_SIMULATION_PAYLOAD_WITH_TELEMETRY,
27+
MOCK_RUN_ID,
2628
MOCK_SIMULATION_RESULT,
2729
MOCK_POLICYENGINE_BUNDLE,
2830
MOCK_RESOLVED_APP_NAME,
@@ -136,6 +138,7 @@ def test__given_valid_payload__then_returns_execution_with_job_id(
136138

137139
# Then
138140
assert execution.job_id == MOCK_MODAL_JOB_ID
141+
assert execution.run_id == MOCK_RUN_ID
139142
assert execution.status == MODAL_EXECUTION_STATUS_SUBMITTED
140143
assert execution.policyengine_bundle == MOCK_POLICYENGINE_BUNDLE
141144
assert execution.resolved_app_name == MOCK_RESOLVED_APP_NAME
@@ -161,6 +164,22 @@ def test__given_valid_payload__then_posts_to_correct_endpoint(
161164
assert "/simulate/economy/comparison" in call_args[0][0]
162165
assert call_args[1]["json"] == MOCK_SIMULATION_PAYLOAD
163166

167+
def test__given_telemetry_payload__then_preserves_it_in_post_body(
168+
self,
169+
mock_httpx_client,
170+
mock_modal_logger,
171+
):
172+
mock_httpx_client.post.return_value = create_mock_httpx_response(
173+
status_code=202,
174+
json_data=MOCK_SUBMIT_RESPONSE_SUCCESS,
175+
)
176+
api = SimulationAPIModal()
177+
178+
api.run(MOCK_SIMULATION_PAYLOAD_WITH_TELEMETRY)
179+
180+
call_args = mock_httpx_client.post.call_args
181+
assert call_args[1]["json"]["_telemetry"]["run_id"] == MOCK_RUN_ID
182+
164183
def test__given_http_error__then_raises_exception(
165184
self,
166185
mock_httpx_client,

tests/unit/services/test_economy_service.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
MOCK_OPTIONS_HASH,
2727
MOCK_EXECUTION_ID,
2828
MOCK_PROCESS_ID,
29+
MOCK_RUN_ID,
2930
MOCK_REFORM_IMPACT_DATA,
3031
MOCK_RESOLVED_DATASET,
3132
MOCK_RESOLVED_APP_NAME,
@@ -276,6 +277,38 @@ def test__given_no_previous_impact__includes_metadata_in_simulation_params(
276277
sim_params["_metadata"]["resolved_app_name"] == MOCK_RESOLVED_APP_NAME
277278
)
278279

280+
def test__given_no_previous_impact__includes_telemetry_in_simulation_params(
281+
self,
282+
economy_service,
283+
base_params,
284+
mock_country_package_versions,
285+
mock_get_dataset_version,
286+
mock_policy_service,
287+
mock_reform_impacts_service,
288+
mock_simulation_api,
289+
mock_logger,
290+
mock_datetime,
291+
mock_numpy_random,
292+
):
293+
mock_reform_impacts_service.get_all_reform_impacts.return_value = []
294+
295+
economy_service.get_economic_impact(**base_params)
296+
297+
sim_params = mock_simulation_api.run.call_args[0][0]
298+
299+
assert sim_params["_telemetry"]["run_id"]
300+
assert sim_params["_telemetry"]["process_id"] == MOCK_PROCESS_ID
301+
assert sim_params["_telemetry"]["simulation_kind"] == "national"
302+
assert sim_params["_telemetry"]["geography_type"] == "national"
303+
assert sim_params["_telemetry"]["geography_code"] == MOCK_COUNTRY_ID
304+
assert sim_params["_telemetry"]["capture_mode"] == "disabled"
305+
assert sim_params["_telemetry"]["config_hash"].startswith("sha256:")
306+
progress_log = mock_logger.log_struct.call_args_list[-1].args[0]
307+
assert progress_log["run_id"] == MOCK_RUN_ID
308+
assert (
309+
mock_logger.log_struct.call_args_list[-1].kwargs["severity"] == "INFO"
310+
)
311+
279312
def test__given_runtime_cache_version__uses_versioned_economy_cache_key(
280313
self,
281314
economy_service,

0 commit comments

Comments
 (0)