Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b20f0e4
Log exceptions instead of silently swallowing them in calculate
MaxGhenis Mar 10, 2026
8e81079
Add budget window batch economy endpoint
MaxGhenis Apr 9, 2026
14018b7
Harden budget window batch API
MaxGhenis Apr 9, 2026
530c3bd
Address budget window review findings
MaxGhenis Apr 9, 2026
7f71565
Prevent duplicate budget window jobs across workers
MaxGhenis Apr 9, 2026
f216578
Harden reform impact claim deduping
MaxGhenis Apr 9, 2026
83dd648
Tighten budget window claim recovery
MaxGhenis Apr 9, 2026
1be8771
Prevent stale claim takeover
MaxGhenis Apr 9, 2026
c663e73
Backfill reform impact schema lazily
MaxGhenis Apr 9, 2026
7b1c3ad
Backfill reform impact dataset column
MaxGhenis Apr 10, 2026
f7baae4
Address budget window review findings
MaxGhenis Apr 10, 2026
5edef99
Mark pre-submission setup failures as errors
MaxGhenis Apr 10, 2026
474513c
Adapt budget-window flow to batch simulation API
anth-volk Apr 16, 2026
324b29f
Fix lint after budget-window rebase
anth-volk May 1, 2026
9c3d10f
Address budget-window review feedback
anth-volk May 1, 2026
c968a2e
Ignore budget-window version override
anth-volk May 1, 2026
ad461c3
Stop passing budget-window version
anth-volk May 1, 2026
a64db3f
Align budget-window version handling
anth-volk May 1, 2026
09ba29a
Patch remote database accessor in unit tests
anth-volk May 4, 2026
c1c5ea3
Add budget-window cache test coverage
anth-volk May 4, 2026
35e7d2f
Harden budget-window cache responses
anth-volk May 4, 2026
ba93db9
Cover budget-window dedupe and failure paths
anth-volk May 4, 2026
42cf9dd
Canonicalize budget-window cache options
anth-volk May 4, 2026
707a896
Narrow budget-window PR scope
anth-volk May 4, 2026
fdda395
Update live budget-window test years
anth-volk May 5, 2026
0d0252a
Document required Redis runtime
anth-volk May 5, 2026
55e4f2f
Extract budget-window setup utilities
anth-volk May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ OPENAI_API_KEY=policyengine_openai_api_key

# Token for Hugging Face models
HUGGING_FACE_TOKEN=policyengine_huggingface_token

# Redis is required for budget-window economy requests and other API cache paths.
# Local development and App Engine use an in-container/local Redis by default.
CACHE_REDIS_HOST=127.0.0.1
CACHE_REDIS_PORT=6379
CACHE_REDIS_DB=0
2 changes: 1 addition & 1 deletion .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ jobs:
- name: Install staging test dependencies
run: pip install pytest httpx
- name: Run staging smoke test
run: python -m pytest tests/integration/test_live_calculate.py tests/integration/test_live_economy.py -v
run: python -m pytest tests/integration/test_live_calculate.py tests/integration/test_live_economy.py tests/integration/test_live_budget_window_cache.py -v
env:
API_BASE_URL: ${{ needs.deploy-staging.outputs.url }}
STAGING_API_TEST_PROBE_ID: ${{ needs.deploy-staging.outputs.version }}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test-env-vars:
pytest tests/env_variables

test:
MAX_HOUSEHOLDS=1000 coverage run -a --branch -m pytest tests/to_refactor tests/unit --disable-pytest-warnings
MAX_HOUSEHOLDS=1000 coverage run -a --branch -m pytest tests/to_refactor tests/unit tests/integration/test_budget_window_in_flight_dedupe.py --disable-pytest-warnings
coverage xml -i

debug-test:
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ NOTE: Any output that needs to be calculated will not work. Therefore, only hous

### 6. Testing calculations

Redis is required for API cache paths, including budget-window economy requests. The budget-window endpoint uses Redis for completed-result caching and in-flight batch deduplication; if Redis is unavailable, those requests fail instead of falling back to the database or an in-process cache.

To test anything that utilizes Redis or the API's service workers (e.g. anything that requires society-wide calculations with the policy calculator), you'll also need to complete the following steps:

1. Start Redis
Expand All @@ -136,6 +138,8 @@ brew install redis
redis-server
```

By default the API connects to Redis at `127.0.0.1:6379`, database `0`. Override this with `CACHE_REDIS_HOST`, `CACHE_REDIS_PORT`, and `CACHE_REDIS_DB` if your local Redis uses different connection settings.

2. Start the API

Run the below
Expand All @@ -144,6 +148,8 @@ Run the below
FLASK_DEBUG=1 python -m flask --app policyengine_api.api run
```

App Engine staging and production deployments install and start Redis in the API container before Gunicorn starts.

NOTE: Calculations are not possible in the uk app without access to a specific dataset. Expect an error: "ValueError: Invalid response code 404 for url https://api.github.com/repos/policyengine/non-public-microdata/releases/tags/uk-2024-march-efo."

## Testing, Formatting, Changelogging
Expand Down
1 change: 1 addition & 0 deletions changelog.d/budget-window-batch.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a budget-window economy endpoint that batches yearly impact calculations with bounded server-side concurrency and returns aggregated progress plus totals.
2 changes: 2 additions & 0 deletions gcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The deployment actions build Docker images and deploy them to Google App Engine. The docker images themselves are based off a starter image (to save each API docker image having to spend 5 minutes installing the same dependencies). The starter image is the `Dockerfile` in this directory.

The App Engine API image installs `redis-server` and starts it through `gcp/policyengine_api/start.sh`. Redis is required at runtime for budget-window economy request caching and in-flight batch deduplication. The API reads `CACHE_REDIS_HOST`, `CACHE_REDIS_PORT`, and `CACHE_REDIS_DB`, defaulting to `127.0.0.1`, `6379`, and `0`.

To update the starter image:
* `python setup.py sdist` to build the python package
* `twine upload dist/*` to upload the package to pypi as `policyengine-api`
Expand Down
21 changes: 14 additions & 7 deletions gcp/policyengine_api/start.sh
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
#!/bin/sh
# Environment variables
PORT="${PORT:-8080}"
REDIS_PORT="${REDIS_PORT:-6379}"
CACHE_REDIS_HOST="${CACHE_REDIS_HOST:-127.0.0.1}"
CACHE_REDIS_PORT="${CACHE_REDIS_PORT:-6379}"
CACHE_REDIS_DB="${CACHE_REDIS_DB:-0}"
export CACHE_REDIS_HOST CACHE_REDIS_PORT CACHE_REDIS_DB

# Start the API
gunicorn -b :"$PORT" policyengine_api.api --timeout 300 --workers 5 --preload &

# Start Redis with configuration for multiple clients
redis-server --protected-mode no \
# Start Redis with configuration for multiple clients.
redis-server --bind "$CACHE_REDIS_HOST" \
--port "$CACHE_REDIS_PORT" \
--protected-mode yes \
--maxclients 10000 \
--timeout 0 &

# Wait for Redis to be ready
sleep 2
until redis-cli -h "$CACHE_REDIS_HOST" -p "$CACHE_REDIS_PORT" ping >/dev/null 2>&1; do
sleep 1
done

# Start the API
gunicorn -b :"$PORT" policyengine_api.api --timeout 300 --workers 5 --preload &

# Keep the script running and handle shutdown gracefully
trap "pkill -P $$; exit 1" INT TERM
Expand Down
6 changes: 4 additions & 2 deletions policyengine_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import time
import sys
import os

start_time = time.time()

Expand Down Expand Up @@ -89,8 +90,9 @@ def log_timing(message):
{
"CACHE_TYPE": "RedisCache",
"CACHE_KEY_PREFIX": "policyengine",
"CACHE_REDIS_HOST": "127.0.0.1",
"CACHE_REDIS_PORT": 6379,
"CACHE_REDIS_HOST": os.environ.get("CACHE_REDIS_HOST", "127.0.0.1"),
"CACHE_REDIS_PORT": int(os.environ.get("CACHE_REDIS_PORT", "6379")),
"CACHE_REDIS_DB": int(os.environ.get("CACHE_REDIS_DB", "0")),
"CACHE_DEFAULT_TIMEOUT": 300,
}
)
Expand Down
120 changes: 119 additions & 1 deletion policyengine_api/libs/simulation_api_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import os
import sys
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional

import httpx
Expand Down Expand Up @@ -42,6 +42,28 @@ def name(self) -> str:
return self.job_id


@dataclass
class ModalBudgetWindowBatchExecution:
"""
Represents a budget-window batch execution in the Modal simulation API.
"""

batch_job_id: str
status: str
progress: Optional[int] = None
completed_years: list[str] = field(default_factory=list)
running_years: list[str] = field(default_factory=list)
queued_years: list[str] = field(default_factory=list)
failed_years: list[str] = field(default_factory=list)
result: Optional[dict] = None
error: Optional[str] = None

@property
def name(self) -> str:
"""Alias for batch_job_id."""
return self.batch_job_id


class SimulationAPIModal:
"""
HTTP client for the Modal Simulation API.
Expand Down Expand Up @@ -154,6 +176,57 @@ def run(self, payload: dict) -> ModalSimulationExecution:
)
raise

def run_budget_window_batch(self, payload: dict) -> ModalBudgetWindowBatchExecution:
"""
Submit a budget-window batch job to the Modal API.
"""
try:
modal_payload = dict(payload)
if "model_version" in modal_payload:
modal_payload["version"] = modal_payload.pop("model_version")
modal_payload.pop("data_version", None)

response = self.client.post(
f"{self.base_url}/simulate/economy/budget-window",
json=modal_payload,
)
response.raise_for_status()
data = response.json()

logger.log_struct(
{
"message": "Modal budget-window batch submitted",
"batch_job_id": data.get("batch_job_id"),
"status": data.get("status"),
},
severity="INFO",
)

return ModalBudgetWindowBatchExecution(
batch_job_id=data["batch_job_id"],
status=data["status"],
)

except httpx.HTTPStatusError as e:
logger.log_struct(
{
"message": f"Modal batch API HTTP error: {e.response.status_code}",
"response_text": e.response.text[:500],
},
severity="ERROR",
)
raise

except httpx.RequestError as e:
logger.log_struct(
{
"message": f"Modal batch API request error: {str(e)}",
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
},
severity="ERROR",
)
raise

def resolve_app_name(
self, country: str, version: Optional[str] = None
) -> tuple[str, str]:
Expand Down Expand Up @@ -235,6 +308,51 @@ def get_execution_by_id(self, job_id: str) -> ModalSimulationExecution:
)
raise

def get_budget_window_batch_by_id(
self, batch_job_id: str
) -> ModalBudgetWindowBatchExecution:
"""
Poll the Modal API for the current status of a budget-window batch.
"""
try:
response = self.client.get(
f"{self.base_url}/budget-window-jobs/{batch_job_id}"
)
if response.status_code not in (200, 202, 500):
response.raise_for_status()
data = response.json()

return ModalBudgetWindowBatchExecution(
batch_job_id=batch_job_id,
status=data["status"],
progress=data.get("progress"),
completed_years=data.get("completed_years", []),
running_years=data.get("running_years", []),
queued_years=data.get("queued_years", []),
failed_years=data.get("failed_years", []),
result=data.get("result"),
error=data.get("error"),
)

except httpx.HTTPStatusError as e:
logger.log_struct(
{
"message": f"Modal batch API HTTP error polling job {batch_job_id}: {e.response.status_code}",
"response_text": e.response.text[:500],
},
severity="ERROR",
)
raise

except httpx.RequestError as e:
logger.log_struct(
{
"message": f"Modal batch API request error polling job {batch_job_id}: {str(e)}",
},
severity="ERROR",
)
raise

def get_execution_status(self, execution: ModalSimulationExecution) -> str:
"""
Get the status string from an execution.
Expand Down
Loading
Loading