Skip to content

Commit c89c489

Browse files
Add scripts files
1 parent b31b117 commit c89c489

2 files changed

Lines changed: 819 additions & 0 deletions

File tree

scripts/analyze_codee_reports.py

Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
#!/usr/bin/env python3
2+
"""Analyze Codee JSON or HTML reports over time and generate time-series visualizations."""
3+
4+
import argparse
5+
import json
6+
import logging
7+
import re
8+
import sys
9+
import pandas as pd
10+
from collections import defaultdict
11+
from datetime import datetime
12+
from pathlib import Path
13+
14+
15+
def setup_logging() -> logging.Logger:
16+
"""Configure logging for the script."""
17+
logger = logging.getLogger("codee_analyzer")
18+
logger.setLevel(logging.INFO)
19+
handler = logging.StreamHandler(sys.stderr)
20+
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
21+
logger.addHandler(handler)
22+
return logger
23+
24+
25+
def load_json_file(file_path: Path, logger: logging.Logger) -> dict | None:
26+
"""Load and parse a JSON file, returning None on failure."""
27+
try:
28+
with open(file_path, "r", encoding="utf-8") as f:
29+
return json.load(f)
30+
except json.JSONDecodeError as e:
31+
logger.warning(f"Malformed JSON in {file_path.name}: {e}")
32+
return None
33+
except OSError as e:
34+
logger.warning(f"Failed to read {file_path.name}: {e}")
35+
return None
36+
37+
38+
def load_html_report(report_dir: Path, logger: logging.Logger) -> dict | None:
39+
"""Load a Codee HTML report by parsing report.js file."""
40+
report_js_path = report_dir / "report.js"
41+
if not report_js_path.exists():
42+
logger.warning(f"report.js not found in {report_dir.name}")
43+
return None
44+
45+
try:
46+
content = report_js_path.read_text(encoding="utf-8")
47+
match = re.search(r"const\s+report\s*=\s*(\{.*\});", content, re.DOTALL)
48+
if not match:
49+
match = re.search(r"var\s+report\s*=\s*(\{.*\});", content, re.DOTALL)
50+
51+
if not match:
52+
logger.warning(f"Could not find report object in {report_js_path}")
53+
return None
54+
55+
json_str = match.group(1)
56+
return json.loads(json_str)
57+
except Exception as e:
58+
logger.warning(f"Failed to parse report.js in {report_dir.name}: {e}")
59+
return None
60+
61+
62+
def extract_timestamp(report: dict, logger: logging.Logger) -> int | None:
63+
"""Extract timestamp from report."""
64+
try:
65+
exec_info = report.get("CodeeExecutionInfo", {})
66+
ts_utc = exec_info.get("TimestampUTC")
67+
if ts_utc:
68+
dt = datetime.fromisoformat(ts_utc.replace("Z", "+00:00"))
69+
return int(dt.timestamp())
70+
return exec_info.get("TimestampEpochSeconds")
71+
except Exception as e:
72+
logger.warning(f"Failed to extract timestamp: {e}")
73+
return None
74+
75+
76+
def extract_checker_counts(report: dict, logger: logging.Logger) -> dict[str, int]:
77+
"""Extract checker counts from Quality and Optimization rankings."""
78+
counts: dict[str, int] = {}
79+
80+
try:
81+
screening = report.get("Screening", {})
82+
83+
quality_table = screening.get("Ranking of Quality Checkers", {}).get(
84+
"DataTable", []
85+
)
86+
for row in quality_table:
87+
checker = row.get("Checker", "")
88+
if checker and checker != "Total":
89+
try:
90+
counts[checker] = int(row.get("#", "0"))
91+
except ValueError:
92+
logger.warning(f"Invalid count for checker {checker}")
93+
94+
opt_table = screening.get("Ranking of Optimization Checkers", {}).get(
95+
"DataTable", []
96+
)
97+
for row in opt_table:
98+
checker = row.get("Checker", "")
99+
if checker and checker != "Total":
100+
try:
101+
counts[checker] = int(row.get("#", "0"))
102+
except ValueError:
103+
logger.warning(f"Invalid count for checker {checker}")
104+
105+
except Exception as e:
106+
logger.warning(f"Failed to extract checker counts: {e}")
107+
108+
return counts
109+
110+
111+
def extract_checker_priorities(report: dict, logger: logging.Logger) -> dict[str, str]:
112+
"""Extract priority for each checker."""
113+
priorities: dict[str, str] = {}
114+
115+
try:
116+
screening = report.get("Screening", {})
117+
118+
quality_table = screening.get("Ranking of Quality Checkers", {}).get(
119+
"DataTable", []
120+
)
121+
for row in quality_table:
122+
checker = row.get("Checker", "")
123+
if checker and checker != "Total":
124+
priority = row.get("Priority", "")
125+
if priority:
126+
priorities[checker] = priority
127+
128+
opt_table = screening.get("Ranking of Optimization Checkers", {}).get(
129+
"DataTable", []
130+
)
131+
for row in opt_table:
132+
checker = row.get("Checker", "")
133+
if checker and checker != "Total":
134+
priority = row.get("Priority", "")
135+
if priority:
136+
priorities[checker] = priority
137+
138+
except Exception as e:
139+
logger.warning(f"Failed to extract checker priorities: {e}")
140+
141+
return priorities
142+
143+
144+
def extract_l_level(priority_str: str) -> str:
145+
"""Extract L-level from priority string like 'P18 (L1)'."""
146+
if not priority_str:
147+
return "Unknown"
148+
if "L1" in priority_str:
149+
return "L1"
150+
if "L2" in priority_str:
151+
return "L2"
152+
if "L3" in priority_str:
153+
return "L3"
154+
if "L4" in priority_str:
155+
return "L4"
156+
return "Unknown"
157+
158+
159+
def detect_input_type(input_dir: Path) -> str:
160+
"""Detect if input is JSON or HTML format by searching recursively."""
161+
has_json = list(input_dir.rglob("*.json"))
162+
has_html_dirs = list(input_dir.rglob("report.js"))
163+
164+
if has_html_dirs:
165+
return "html"
166+
elif has_json:
167+
return "json"
168+
return "unknown"
169+
170+
171+
def load_reports(
172+
input_dir: Path, logger: logging.Logger
173+
) -> list[tuple[int, dict, Path | None]]:
174+
"""Load all reports from directory recursively, sorted by timestamp.
175+
176+
Returns list of tuples: (timestamp, report_dict, path)
177+
path is the path to the report file (for linking)
178+
"""
179+
reports: list[tuple[int, dict, Path | None]] = []
180+
input_type = detect_input_type(input_dir)
181+
182+
if input_type == "html":
183+
logger.info("Detected HTML format (recursive search for report.js)")
184+
html_dirs = sorted(input_dir.rglob("report.js"))
185+
for report_js in html_dirs:
186+
report_dir = report_js.parent
187+
report = load_html_report(report_dir, logger)
188+
if report is None:
189+
continue
190+
191+
timestamp = extract_timestamp(report, logger)
192+
if timestamp is None:
193+
logger.warning(f"Skipping {report_dir.name}: no valid timestamp")
194+
continue
195+
196+
reports.append((timestamp, report, report_dir))
197+
198+
elif input_type == "json":
199+
logger.info("Detected JSON format (recursive search for *.json)")
200+
json_files = sorted(input_dir.rglob("*.json"))
201+
if not json_files:
202+
logger.warning(f"No JSON files found in {input_dir}")
203+
return reports
204+
205+
for file_path in json_files:
206+
report = load_json_file(file_path, logger)
207+
if report is None:
208+
continue
209+
210+
timestamp = extract_timestamp(report, logger)
211+
if timestamp is None:
212+
logger.warning(f"Skipping {file_path.name}: no valid timestamp")
213+
continue
214+
215+
reports.append((timestamp, report, file_path))
216+
217+
else:
218+
logger.error(f"No valid reports found in {input_dir}")
219+
return reports
220+
221+
reports.sort(key=lambda x: x[0])
222+
return reports
223+
224+
225+
def build_checker_dataframe(
226+
reports: list[tuple[int, dict, Path | None]], logger: logging.Logger
227+
) -> pd.DataFrame:
228+
"""Build a DataFrame with timestamps as index and checker counts as columns."""
229+
all_checkers: set[str] = set()
230+
231+
for _, report, _ in reports:
232+
counts = extract_checker_counts(report, logger)
233+
all_checkers.update(counts.keys())
234+
235+
sorted_checkers = sorted(all_checkers)
236+
data: dict[str, list[int]] = {checker: [] for checker in sorted_checkers}
237+
timestamps: list[int] = []
238+
239+
for timestamp, report, _ in reports:
240+
timestamps.append(timestamp)
241+
counts = extract_checker_counts(report, logger)
242+
for checker in sorted_checkers:
243+
data[checker].append(counts.get(checker, 0))
244+
245+
df = pd.DataFrame(data, index=pd.to_datetime(timestamps, unit="s"))
246+
df.index.name = "timestamp"
247+
return df
248+
249+
250+
def get_checker_priorities_for_df(
251+
df: pd.DataFrame,
252+
reports: list[tuple[int, dict, Path | None]],
253+
logger: logging.Logger,
254+
) -> dict[str, str]:
255+
"""Get priority mapping for all checkers in the DataFrame."""
256+
priorities: dict[str, str] = {}
257+
258+
for _, report, _ in reports:
259+
report_priorities = extract_checker_priorities(report, logger)
260+
priorities.update(report_priorities)
261+
262+
return priorities
263+
264+
265+
def generate_html_report(
266+
df: pd.DataFrame,
267+
reports: list[tuple[int, dict, Path | None]],
268+
input_dir: Path,
269+
output_dir: Path,
270+
logger: logging.Logger,
271+
) -> None:
272+
"""Generate interactive HTML report with Chart.js."""
273+
logger.info("Generating interactive HTML report")
274+
275+
priorities = get_checker_priorities_for_df(df, reports, logger)
276+
labels = [d.strftime("%Y-%m-%d") for d in df.index]
277+
total_data = df.sum(axis=1).tolist()
278+
279+
checker_data: dict[str, list[int]] = {}
280+
for checker in df.columns:
281+
checker_data[checker] = df[checker].tolist()
282+
283+
priority_order = ["L1", "L2", "L3", "L4", "Unknown"]
284+
priority_colors = {
285+
"L1": "#DC143C",
286+
"L2": "#FF8C00",
287+
"L3": "#228B22",
288+
"L4": "#90EE90",
289+
"Unknown": "#D3D3D3",
290+
}
291+
292+
l_level_data: dict[str, list[int]] = {l: [] for l in priority_order}
293+
for idx in df.index:
294+
row = df.loc[idx]
295+
p_groups: dict[str, int] = {l: 0 for l in priority_order}
296+
for checker, count in row.items():
297+
l_level = extract_l_level(priorities.get(checker, ""))
298+
p_groups[l_level] += count
299+
for l_level in priority_order:
300+
l_level_data[l_level].append(p_groups[l_level])
301+
302+
# Build links to original reports (JSON or HTML)
303+
report_links: list[dict | None] = []
304+
for _, _, report_path in reports:
305+
if report_path:
306+
abs_path = report_path.resolve()
307+
if report_path.suffix == ".json":
308+
report_links.append({"link": str(abs_path), "type": "json"})
309+
else:
310+
index_html = abs_path / "index.html"
311+
if index_html.exists():
312+
report_links.append({"link": str(index_html), "type": "html"})
313+
else:
314+
report_links.append({"link": str(abs_path), "type": "html"})
315+
else:
316+
report_links.append(None)
317+
318+
chart_data = {
319+
"labels": labels,
320+
"total": total_data,
321+
"checkers": checker_data,
322+
"priorities": l_level_data,
323+
"priorityColors": priority_colors,
324+
"reportLinks": report_links,
325+
"hasLinks": any(link is not None for link in report_links),
326+
}
327+
328+
template_path = Path(__file__).parent / "templates" / "codee_report.html"
329+
html_template = template_path.read_text(encoding="utf-8")
330+
331+
replacements = {
332+
"${REPORT_COUNT}": str(len(reports)),
333+
"${CHECKER_COUNT}": str(len(checker_data)),
334+
"${TOTAL_FINDINGS}": str(int(total_data[-1])),
335+
"${INPUT_DIR}": str(input_dir),
336+
"${CHART_DATA}": json.dumps(chart_data, indent=2),
337+
}
338+
339+
for placeholder, value in replacements.items():
340+
html_template = html_template.replace(placeholder, value)
341+
342+
html_path = output_dir / "codee_analysis.html"
343+
html_path.write_text(html_template, encoding="utf-8")
344+
logger.info(f"HTML report saved to {html_path}")
345+
346+
347+
def main() -> int:
348+
parser = argparse.ArgumentParser(
349+
description="Analyze Codee JSON or HTML reports and generate HTML visualizations."
350+
)
351+
parser.add_argument(
352+
"input_dir",
353+
type=Path,
354+
help="Directory containing Codee report files (JSON or HTML with report.js)",
355+
)
356+
parser.add_argument(
357+
"output_dir",
358+
type=Path,
359+
help="Directory where HTML report will be saved",
360+
)
361+
args = parser.parse_args()
362+
363+
logger = setup_logging()
364+
365+
if not args.input_dir.is_dir():
366+
logger.error(f"Input directory does not exist: {args.input_dir}")
367+
return 1
368+
369+
args.output_dir.mkdir(parents=True, exist_ok=True)
370+
371+
logger.info(f"Loading reports from {args.input_dir}")
372+
reports = load_reports(args.input_dir, logger)
373+
374+
if not reports:
375+
logger.warning("No valid reports loaded. Exiting.")
376+
return 0
377+
378+
logger.info(f"Loaded {len(reports)} reports")
379+
380+
logger.info("Building checker DataFrame")
381+
df = build_checker_dataframe(reports, logger)
382+
383+
generate_html_report(df, reports, args.input_dir, args.output_dir, logger)
384+
385+
return 0
386+
387+
388+
if __name__ == "__main__":
389+
sys.exit(main())

0 commit comments

Comments
 (0)