833 lines
31 KiB
Python
Executable file
833 lines
31 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Aggregate auto_play batch results into a CSV + summary + assertions.
|
|
|
|
Reads all game_<stamp>_seed<N>/ directories under <results_dir>.
|
|
Pulls the last line of turn_stats.jsonl as the final game state (fast path).
|
|
Counts events from events.jsonl.
|
|
Optionally reads .save files with --deep.
|
|
|
|
Usage:
|
|
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]
|
|
|
|
Exits:
|
|
0 all games parsed, validated, and assertions passed
|
|
1 schema validation failure OR assertion failure OR missing results
|
|
2 usage error
|
|
|
|
stdlib only — no pip installs.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import statistics
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import importlib.util as _iu
|
|
|
|
_validate_path = Path(__file__).resolve().parent / "autoplay-validate.py"
|
|
_spec = _iu.spec_from_file_location("autoplay_validate", _validate_path)
|
|
if _spec is None or _spec.loader is None:
|
|
raise ImportError(f"cannot load {_validate_path}")
|
|
_mod = _iu.module_from_spec(_spec)
|
|
_spec.loader.exec_module(_mod)
|
|
load_schema = _mod.load_schema
|
|
validate = _mod.validate
|
|
|
|
TURN_STATS_SCHEMA_NAME = "turn-stats-line"
|
|
EVENTS_SCHEMA_NAME = "events-line"
|
|
META_SCHEMA_NAME = "meta"
|
|
|
|
EVENT_TYPES = [
|
|
"city_founded", "city_captured", "city_grew", "city_starved",
|
|
"tech_researched", "unit_created", "unit_destroyed", "combat_resolved", "victory",
|
|
"weather_event", "climate_effect",
|
|
]
|
|
|
|
|
|
def find_game_dirs(results_dir: Path) -> tuple[list[tuple[int, Path]], list[int]]:
|
|
"""Find game_<stamp>_seed<N>/ directories. Returns (found, missing_seeds).
|
|
|
|
For each seed number, picks the most recent directory (lexicographic max on stamp).
|
|
"""
|
|
by_seed: dict[int, list[Path]] = {}
|
|
for d in results_dir.iterdir():
|
|
if not d.is_dir():
|
|
continue
|
|
name = d.name
|
|
if not name.startswith("game_"):
|
|
continue
|
|
# Expected: game_<stamp>_seed<N>
|
|
parts = name.rsplit("_seed", 1)
|
|
if len(parts) != 2 or not parts[1].isdigit():
|
|
continue
|
|
seed = int(parts[1])
|
|
by_seed.setdefault(seed, []).append(d)
|
|
|
|
found: list[tuple[int, Path]] = []
|
|
for seed in sorted(by_seed):
|
|
dirs = sorted(by_seed[seed])
|
|
found.append((seed, dirs[-1]))
|
|
|
|
# We report missing only if there are gaps in the seed sequence
|
|
if not found:
|
|
return [], []
|
|
max_seed = max(s for s, _ in found)
|
|
present = {s for s, _ in found}
|
|
missing = [s for s in range(1, max_seed + 1) if s not in present]
|
|
return found, missing
|
|
|
|
|
|
def _read_last_jsonl_line(path: Path) -> str | None:
|
|
"""Read the last non-empty line of a JSONL file efficiently."""
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return None
|
|
for line in reversed(text.splitlines()):
|
|
line = line.strip()
|
|
if line:
|
|
return line
|
|
return None
|
|
|
|
|
|
def _count_jsonl_lines(path: Path) -> int:
|
|
"""Count non-empty lines in a JSONL file."""
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return 0
|
|
return sum(1 for l in text.splitlines() if l.strip())
|
|
|
|
|
|
def _count_events_by_type(path: Path) -> dict[str, int]:
|
|
"""Read events.jsonl, count occurrences per event type."""
|
|
counts: dict[str, int] = {}
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return counts
|
|
for raw in text.splitlines():
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
obj = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
t = obj.get("type", "<unknown>")
|
|
counts[t] = counts.get(t, 0) + 1
|
|
return counts
|
|
|
|
|
|
def _load_player_clans(meta_path: Path) -> dict[str, str]:
|
|
"""Read meta.json and return the {player_index_str: clan_id} mapping.
|
|
|
|
Returns an empty dict when meta.json is missing, unparseable, lacks the
|
|
field, or records no clan entries. Older batches (pre-`player_clans`)
|
|
hit this path cleanly — callers must tolerate missing data.
|
|
"""
|
|
try:
|
|
data = json.loads(meta_path.read_text())
|
|
except (OSError, json.JSONDecodeError):
|
|
return {}
|
|
raw = data.get("player_clans", {})
|
|
if not isinstance(raw, dict):
|
|
return {}
|
|
return {str(k): str(v) for k, v in raw.items() if v}
|
|
|
|
|
|
def _stats_at_turn(
|
|
turn_stats_path: Path, target_turns: tuple[int, ...]
|
|
) -> dict[int, dict[str, Any]]:
|
|
"""Return {turn: turn_stats_line} for each target turn present in the file.
|
|
|
|
Uses the last snapshot at-or-before each target (so if the game ended at
|
|
T216 we still get a T200 reading, and T300 falls through to the final line
|
|
only when the game ran to T300). Missing turns are simply absent from the
|
|
result — callers treat absence as "no data for this snapshot".
|
|
"""
|
|
if not turn_stats_path.exists():
|
|
return {}
|
|
try:
|
|
text = turn_stats_path.read_text()
|
|
except OSError:
|
|
return {}
|
|
|
|
# Walk forward, tracking the most-recent line per bucket.
|
|
by_turn: dict[int, dict[str, Any]] = {}
|
|
remaining = sorted(target_turns)
|
|
# Parse once, keep only what we need (turn + player_stats) to bound memory.
|
|
for raw in text.splitlines():
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
line = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
t = int(line.get("turn", -1))
|
|
if t < 0:
|
|
continue
|
|
for target in remaining:
|
|
if t <= target:
|
|
by_turn[target] = line
|
|
return by_turn
|
|
|
|
|
|
KNOWN_CLAN_IDS = ["blackhammer", "deepforge", "goldvein", "ironhold", "runesmith"]
|
|
|
|
# Per-clan table samples per-player stats at these turn marks.
|
|
PER_CLAN_SNAPSHOT_TURNS = (100, 200, 300)
|
|
|
|
AGGREGATE_FIELDS = [
|
|
"total_combats",
|
|
"total_cities_founded",
|
|
"total_cities_captured",
|
|
"turn_first_combat",
|
|
"turn_first_city_captured",
|
|
]
|
|
|
|
PLAYER_FIELDS = [
|
|
"pop", "pop_peak", "mil",
|
|
"cities", "cities_captured", "cities_lost",
|
|
"gold", "gold_peak", "gold_per_turn",
|
|
"techs", "tiles", "buildings",
|
|
"happiness",
|
|
"food_total", "production_total",
|
|
"kills", "units_lost",
|
|
"turn_first_pop_3", "turn_first_pop_4",
|
|
"tier_peak", "peak_unit_tier", "wonder_count",
|
|
]
|
|
|
|
# Sentinel the reporter emits when a quality-metric field is absent from
|
|
# the incoming jsonl (i.e. the batch pre-dates p0-25 instrumentation). Keeps
|
|
# the CSV/stdout pipeline non-crashing when ingesting historical data.
|
|
QUALITY_METRIC_ABSENT = -1
|
|
QUALITY_METRIC_FIELDS = ("tier_peak", "peak_unit_tier", "wonder_count")
|
|
|
|
|
|
def extract_row(
|
|
seed: int,
|
|
data: dict[str, Any],
|
|
event_counts: dict[str, int],
|
|
player_clans: dict[str, str] | None = None,
|
|
snapshots: dict[int, dict[str, Any]] | None = None,
|
|
) -> dict[str, Any]:
|
|
# turn-stats-line uses "turn" not "turns_played"
|
|
turn = data.get("turn", data.get("turns_played", -1))
|
|
total_events = sum(event_counts.values())
|
|
row: dict[str, Any] = {
|
|
"seed": seed,
|
|
"outcome": data["outcome"],
|
|
"turns_played": turn,
|
|
"winner_index": data["winner_index"],
|
|
"winner_personality": data.get("winner_personality", ""),
|
|
"victory_type": data["victory_type"],
|
|
"wall_clock_sec": round(float(data["wall_clock_sec"]), 2),
|
|
"event_count": total_events,
|
|
}
|
|
for et in EVENT_TYPES:
|
|
row[f"evt_{et}"] = event_counts.get(et, 0)
|
|
for f in AGGREGATE_FIELDS:
|
|
row[f"agg_{f}"] = data["aggregate"][f]
|
|
player_stats: dict[str, Any] = data["player_stats"]
|
|
for pid in ("0", "1"):
|
|
pstat = player_stats.get(pid, {})
|
|
for f in PLAYER_FIELDS:
|
|
if f in QUALITY_METRIC_FIELDS:
|
|
# Backward compat: pre-p0-25 batches omit these fields. Use
|
|
# the absent sentinel so downstream medians filter them out
|
|
# rather than blend 0 into the distribution.
|
|
row[f"p{pid}_{f}"] = pstat.get(f, QUALITY_METRIC_ABSENT)
|
|
else:
|
|
row[f"p{pid}_{f}"] = pstat.get(f, "")
|
|
row["invariant_violations"] = len(data["invariant_violations"])
|
|
# Out-of-band fields for the per-clan aggregator. Not emitted to CSV.
|
|
row["_player_clans"] = dict(player_clans) if player_clans else {}
|
|
row["_snapshots"] = snapshots or {}
|
|
# Canopy telemetry (p0-35). Absent on pre-p0-35 batches — surfaced as
|
|
# None so the summary pipeline can skip those rows cleanly.
|
|
ecology = data.get("ecology") or {}
|
|
row["_canopy_mean_final"] = ecology.get("flora_canopy_mean")
|
|
row["_canopy_delta_final"] = ecology.get("flora_canopy_delta")
|
|
agg = data.get("aggregate", {})
|
|
row["_total_weather_events"] = agg.get("total_weather_events")
|
|
return row
|
|
|
|
|
|
def csv_fieldnames() -> list[str]:
|
|
fields = [
|
|
"seed", "outcome", "turns_played", "winner_index", "winner_personality",
|
|
"victory_type", "wall_clock_sec", "event_count",
|
|
]
|
|
fields += [f"evt_{et}" for et in EVENT_TYPES]
|
|
fields += [f"agg_{f}" for f in AGGREGATE_FIELDS]
|
|
for pid in ("0", "1"):
|
|
fields += [f"p{pid}_{f}" for f in PLAYER_FIELDS]
|
|
fields.append("invariant_violations")
|
|
return fields
|
|
|
|
|
|
VALID_OUTCOMES = {"victory", "max_turns", "defeat", "in_progress"}
|
|
|
|
|
|
def run_assertions(
|
|
rows: list[dict[str, Any]],
|
|
missing_seeds: list[int],
|
|
schema_errors: dict[Path, list[str]],
|
|
) -> list[str]:
|
|
failures: list[str] = []
|
|
|
|
if missing_seeds:
|
|
failures.append(f"Missing game directories for seeds: {missing_seeds}")
|
|
|
|
if schema_errors:
|
|
for path, errs in schema_errors.items():
|
|
failures.append(f"Schema validation failed for {path}:")
|
|
for e in errs[:5]:
|
|
failures.append(f" {e}")
|
|
if len(errs) > 5:
|
|
failures.append(f" ... ({len(errs) - 5} more)")
|
|
|
|
if not rows:
|
|
failures.append("No valid result rows to analyze.")
|
|
return failures
|
|
|
|
bad_outcomes = [r for r in rows if r["outcome"] not in VALID_OUTCOMES]
|
|
if bad_outcomes:
|
|
failures.append(f"{len(bad_outcomes)} game(s) had invalid outcome values")
|
|
|
|
total_violations = sum(r["invariant_violations"] for r in rows)
|
|
if total_violations > 0:
|
|
failures.append(f"Total invariant violations across games: {total_violations}")
|
|
|
|
max_p0_pop = max((r["p0_pop_peak"] for r in rows if r["p0_pop_peak"] != ""), default=0)
|
|
if max_p0_pop < 4:
|
|
failures.append(
|
|
f"No game reached p0_pop_peak >= 4 (max was {max_p0_pop}). "
|
|
"Growth system may be broken."
|
|
)
|
|
|
|
never_combat = [r for r in rows if r["agg_turn_first_combat"] == -1]
|
|
if never_combat:
|
|
failures.append(
|
|
f"{len(never_combat)} game(s) never fought a single combat — "
|
|
"AI may be pacifist or unreachable."
|
|
)
|
|
|
|
no_turns = [r for r in rows if r["turns_played"] < 1]
|
|
if no_turns:
|
|
failures.append(
|
|
f"{len(no_turns)} game(s) have turns_played < 1 — "
|
|
"game may have crashed before completing a turn."
|
|
)
|
|
|
|
return failures
|
|
|
|
|
|
def median_int(values: list[int | float]) -> int:
|
|
filtered = [v for v in values if isinstance(v, (int, float))]
|
|
if not filtered:
|
|
return -1
|
|
return int(statistics.median(filtered))
|
|
|
|
|
|
def build_personality_win_table(rows: list[dict[str, Any]]) -> dict[str, dict[str, int]]:
|
|
"""Return per-clan stats: clan_id → {wins, appearances, losses}.
|
|
|
|
appearances = number of games in which this clan was the AI opponent.
|
|
wins = games won by this clan.
|
|
losses = appearances - wins.
|
|
Uses winner_personality from each row's final turn-stats line.
|
|
Human player (empty clan_id / winner_index 0 with no personality) is
|
|
bucketed under the empty string key and excluded from the balance check.
|
|
"""
|
|
stats: dict[str, dict[str, int]] = {}
|
|
for row in rows:
|
|
personality = row.get("winner_personality", "")
|
|
# Count only AI opponents (non-empty clan)
|
|
if not personality:
|
|
continue
|
|
if personality not in stats:
|
|
stats[personality] = {"wins": 0, "appearances": 0}
|
|
if row["outcome"] == "victory":
|
|
stats[personality]["wins"] += 1
|
|
stats[personality]["appearances"] += 1
|
|
for clan_stats in stats.values():
|
|
clan_stats["losses"] = clan_stats["appearances"] - clan_stats["wins"]
|
|
return stats
|
|
|
|
|
|
def _safe_median(values: list[Any]) -> float | None:
|
|
nums = [float(v) for v in values if isinstance(v, (int, float))]
|
|
return statistics.median(nums) if nums else None
|
|
|
|
|
|
def build_per_clan_stats(rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
|
"""Aggregate per-clan metrics across a batch.
|
|
|
|
Data sources, in priority order:
|
|
- `_player_clans` from meta.json → authoritative: every AI appearance is
|
|
counted regardless of whether that clan won the game.
|
|
- `winner_personality` fallback when `_player_clans` is empty for a row —
|
|
credits the winning clan with an appearance + a win. Non-winning clans
|
|
remain unseen under this fallback (older batches undercount apps).
|
|
|
|
For each clan we record:
|
|
appearances, wins, win_rate, losses,
|
|
median_turns_to_victory (only games this clan won),
|
|
median_pop_peak, median_unit_count_{100,200,300}, median_gold_{100,200,300},
|
|
median_building_count
|
|
All "median_*" fields are computed across the clan's appearances; `None`
|
|
when no sample exists. Unit/gold snapshots pull from the per-turn lines
|
|
recorded in `_snapshots`. Building count uses each game's final line.
|
|
Every clan in KNOWN_CLAN_IDS appears in the result, even with zero data.
|
|
"""
|
|
acc: dict[str, dict[str, list[Any]]] = {
|
|
cid: {
|
|
"appearances": [],
|
|
"wins": [],
|
|
"turns_to_victory": [],
|
|
"pop_peak": [],
|
|
"buildings": [],
|
|
"unit_100": [], "unit_200": [], "unit_300": [],
|
|
"gold_100": [], "gold_200": [], "gold_300": [],
|
|
}
|
|
for cid in KNOWN_CLAN_IDS
|
|
}
|
|
|
|
def _ensure(clan: str) -> None:
|
|
if clan not in acc:
|
|
acc[clan] = {
|
|
"appearances": [],
|
|
"wins": [],
|
|
"turns_to_victory": [],
|
|
"pop_peak": [],
|
|
"buildings": [],
|
|
"unit_100": [], "unit_200": [], "unit_300": [],
|
|
"gold_100": [], "gold_200": [], "gold_300": [],
|
|
}
|
|
|
|
for row in rows:
|
|
clans_in_row: dict[str, str] = row.get("_player_clans") or {}
|
|
winner_idx = row.get("winner_index", -1)
|
|
winner_clan = row.get("winner_personality", "")
|
|
outcome = row.get("outcome", "")
|
|
snapshots: dict[int, dict[str, Any]] = row.get("_snapshots") or {}
|
|
|
|
if clans_in_row:
|
|
# Authoritative path: one record per AI player in this game.
|
|
iter_items = clans_in_row.items()
|
|
elif winner_clan:
|
|
# Fallback: we only know about the winner. Synthesize a single
|
|
# record keyed at the winner's player index so per-player stats
|
|
# pull from the same slot in the final turn line.
|
|
iter_items = [(str(int(winner_idx)), winner_clan)]
|
|
else:
|
|
continue
|
|
|
|
for pid_str, clan in iter_items:
|
|
if not clan:
|
|
continue
|
|
_ensure(clan)
|
|
bucket = acc[clan]
|
|
bucket["appearances"].append(1)
|
|
is_winner = (
|
|
outcome == "victory"
|
|
and str(int(winner_idx)) == str(pid_str)
|
|
)
|
|
if is_winner:
|
|
bucket["wins"].append(1)
|
|
ttv = row.get("turns_played", -1)
|
|
if isinstance(ttv, (int, float)) and ttv >= 0:
|
|
bucket["turns_to_victory"].append(float(ttv))
|
|
|
|
# Per-player stats from the final turn line (already on row).
|
|
pop_peak = row.get(f"p{pid_str}_pop_peak")
|
|
buildings = row.get(f"p{pid_str}_buildings")
|
|
if isinstance(pop_peak, (int, float)):
|
|
bucket["pop_peak"].append(float(pop_peak))
|
|
if isinstance(buildings, (int, float)):
|
|
bucket["buildings"].append(float(buildings))
|
|
|
|
# Per-turn snapshots for T100/T200/T300.
|
|
for t in PER_CLAN_SNAPSHOT_TURNS:
|
|
snap = snapshots.get(t)
|
|
if not snap:
|
|
continue
|
|
pstat = snap.get("player_stats", {}).get(str(pid_str), {})
|
|
mil = pstat.get("mil")
|
|
gold = pstat.get("gold")
|
|
if isinstance(mil, (int, float)):
|
|
bucket[f"unit_{t}"].append(float(mil))
|
|
if isinstance(gold, (int, float)):
|
|
bucket[f"gold_{t}"].append(float(gold))
|
|
|
|
out: dict[str, dict[str, Any]] = {}
|
|
for clan, samples in acc.items():
|
|
apps = len(samples["appearances"])
|
|
wins = len(samples["wins"])
|
|
out[clan] = {
|
|
"appearances": apps,
|
|
"wins": wins,
|
|
"losses": apps - wins,
|
|
"win_rate": (wins / apps) if apps else None,
|
|
"median_turns_to_victory": _safe_median(samples["turns_to_victory"]),
|
|
"median_pop_peak": _safe_median(samples["pop_peak"]),
|
|
"median_building_count": _safe_median(samples["buildings"]),
|
|
"median_unit_count_100": _safe_median(samples["unit_100"]),
|
|
"median_unit_count_200": _safe_median(samples["unit_200"]),
|
|
"median_unit_count_300": _safe_median(samples["unit_300"]),
|
|
"median_gold_100": _safe_median(samples["gold_100"]),
|
|
"median_gold_200": _safe_median(samples["gold_200"]),
|
|
"median_gold_300": _safe_median(samples["gold_300"]),
|
|
}
|
|
return out
|
|
|
|
|
|
def _fmt(v: Any) -> str:
|
|
if v is None:
|
|
return "—"
|
|
if isinstance(v, float):
|
|
return f"{v:.0f}" if v.is_integer() or v >= 10 else f"{v:.1f}"
|
|
return str(v)
|
|
|
|
|
|
def render_per_clan_table(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Print the per-clan metrics table and return the underlying dict."""
|
|
stats = build_per_clan_stats(rows)
|
|
print("per-clan stats:", file=out)
|
|
header = (
|
|
f" {'clan':<12} "
|
|
f"{'apps':>5} {'wins':>5} {'win%':>6} "
|
|
f"{'ttv':>5} {'pop':>5} {'bldg':>5} "
|
|
f"{'u100':>5} {'u200':>5} {'u300':>5} "
|
|
f"{'g100':>6} {'g200':>6} {'g300':>6}"
|
|
)
|
|
print(header, file=out)
|
|
any_data = False
|
|
for clan in sorted(stats):
|
|
s = stats[clan]
|
|
if s["appearances"] == 0 and clan not in KNOWN_CLAN_IDS:
|
|
continue
|
|
if s["appearances"] > 0:
|
|
any_data = True
|
|
wr = s["win_rate"]
|
|
wr_str = f"{int(round(wr * 100))}%" if wr is not None else "—"
|
|
print(
|
|
f" {clan:<12} "
|
|
f"{s['appearances']:>5} {s['wins']:>5} {wr_str:>6} "
|
|
f"{_fmt(s['median_turns_to_victory']):>5} "
|
|
f"{_fmt(s['median_pop_peak']):>5} "
|
|
f"{_fmt(s['median_building_count']):>5} "
|
|
f"{_fmt(s['median_unit_count_100']):>5} "
|
|
f"{_fmt(s['median_unit_count_200']):>5} "
|
|
f"{_fmt(s['median_unit_count_300']):>5} "
|
|
f"{_fmt(s['median_gold_100']):>6} "
|
|
f"{_fmt(s['median_gold_200']):>6} "
|
|
f"{_fmt(s['median_gold_300']):>6}",
|
|
file=out,
|
|
)
|
|
if not any_data:
|
|
print(" (no clan data — meta.player_clans empty, no winning AIs)", file=out)
|
|
return stats
|
|
|
|
|
|
def print_personality_summary(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> None:
|
|
table = build_personality_win_table(rows)
|
|
if not table:
|
|
print("personality win-rate: no data (winner_personality missing from results)", file=out)
|
|
return
|
|
total_games = len(rows)
|
|
print("personality win-rate:", file=out)
|
|
print(f" {'clan':<14} {'wins':>5} {'apps':>5} {'win%':>6}", file=out)
|
|
for clan in sorted(table):
|
|
entry = table[clan]
|
|
apps = entry["appearances"]
|
|
wins = entry["wins"]
|
|
pct = 100 * wins // apps if apps else 0
|
|
flag = " <-- IMBALANCED (>50%)" if pct > 50 else ""
|
|
print(f" {clan:<14} {wins:>5} {apps:>5} {pct:>5}%{flag}", file=out)
|
|
|
|
|
|
def build_quality_metrics(rows: list[dict[str, Any]]) -> dict[str, float | None]:
|
|
"""Aggregate state-at-end quality metrics across a batch (p0-25).
|
|
|
|
Returns medians for:
|
|
median_winner_tier_peak, median_loser_tier_peak, median_tier_peak_gap,
|
|
median_peak_unit_tier (all players), median_wonder_count_per_player.
|
|
|
|
All values are `None` when no sample exists (e.g. all rows are pre-p0-25
|
|
batches where fields are absent — sentinel `QUALITY_METRIC_ABSENT` is
|
|
filtered out). The `_gap` is computed per-game (winner_tier - loser_tier
|
|
for that specific game) then the batch median taken, so it reflects typical
|
|
within-game spread rather than the delta of independent medians.
|
|
"""
|
|
winner_tiers: list[float] = []
|
|
loser_tiers: list[float] = []
|
|
per_game_gaps: list[float] = []
|
|
unit_tiers: list[float] = []
|
|
wonder_counts: list[float] = []
|
|
|
|
for row in rows:
|
|
winner_idx = row.get("winner_index", -1)
|
|
p0_tp = row.get("p0_tier_peak", QUALITY_METRIC_ABSENT)
|
|
p1_tp = row.get("p1_tier_peak", QUALITY_METRIC_ABSENT)
|
|
p0_ok = isinstance(p0_tp, (int, float)) and p0_tp != QUALITY_METRIC_ABSENT
|
|
p1_ok = isinstance(p1_tp, (int, float)) and p1_tp != QUALITY_METRIC_ABSENT
|
|
|
|
# Winner/loser tier_peak + per-game gap (only when both sides recorded).
|
|
if winner_idx in (0, 1) and p0_ok and p1_ok:
|
|
if winner_idx == 0:
|
|
winner_tiers.append(float(p0_tp))
|
|
loser_tiers.append(float(p1_tp))
|
|
per_game_gaps.append(float(p0_tp) - float(p1_tp))
|
|
else:
|
|
winner_tiers.append(float(p1_tp))
|
|
loser_tiers.append(float(p0_tp))
|
|
per_game_gaps.append(float(p1_tp) - float(p0_tp))
|
|
|
|
for pid in ("0", "1"):
|
|
put = row.get(f"p{pid}_peak_unit_tier", QUALITY_METRIC_ABSENT)
|
|
if isinstance(put, (int, float)) and put != QUALITY_METRIC_ABSENT:
|
|
unit_tiers.append(float(put))
|
|
wc = row.get(f"p{pid}_wonder_count", QUALITY_METRIC_ABSENT)
|
|
if isinstance(wc, (int, float)) and wc != QUALITY_METRIC_ABSENT:
|
|
wonder_counts.append(float(wc))
|
|
|
|
return {
|
|
"median_winner_tier_peak": _safe_median(winner_tiers),
|
|
"median_loser_tier_peak": _safe_median(loser_tiers),
|
|
"median_tier_peak_gap": _safe_median(per_game_gaps),
|
|
"median_peak_unit_tier": _safe_median(unit_tiers),
|
|
"median_wonder_count_per_player": _safe_median(wonder_counts),
|
|
}
|
|
|
|
|
|
def print_quality_metrics(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> dict[str, float | None]:
|
|
"""Print the state-at-end quality metrics block and return the dict."""
|
|
q = build_quality_metrics(rows)
|
|
print("state-at-end quality metrics (p0-25):", file=out)
|
|
any_data = any(v is not None for v in q.values())
|
|
if not any_data:
|
|
print(" (no data — batch pre-dates p0-25 instrumentation)", file=out)
|
|
return q
|
|
print(f" median_winner_tier_peak: {_fmt(q['median_winner_tier_peak'])}", file=out)
|
|
print(f" median_loser_tier_peak: {_fmt(q['median_loser_tier_peak'])}", file=out)
|
|
print(f" median_tier_peak_gap: {_fmt(q['median_tier_peak_gap'])}", file=out)
|
|
print(f" median_peak_unit_tier: {_fmt(q['median_peak_unit_tier'])}", file=out)
|
|
print(f" median_wonder_count_per_player: {_fmt(q['median_wonder_count_per_player'])}", file=out)
|
|
return q
|
|
|
|
|
|
def print_canopy_summary(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> None:
|
|
"""Report canopy trend across the batch (p0-35).
|
|
|
|
Sources each game's final `ecology.flora_canopy_mean` / `flora_canopy_delta`
|
|
from the last turn_stats line. Historical batches pre-p0-35 have None for
|
|
these fields — we skip those rows rather than mixing 0.0 into the median.
|
|
"""
|
|
means = [r["_canopy_mean_final"] for r in rows
|
|
if isinstance(r.get("_canopy_mean_final"), (int, float))]
|
|
deltas = [r["_canopy_delta_final"] for r in rows
|
|
if isinstance(r.get("_canopy_delta_final"), (int, float))]
|
|
print("flora canopy (p0-35):", file=out)
|
|
if not means:
|
|
print(" (no data — batch pre-dates p0-35 instrumentation)", file=out)
|
|
return
|
|
print(f" median final canopy mean: {_fmt(statistics.median(means))}", file=out)
|
|
nonzero_deltas = [d for d in deltas if d != 0.0]
|
|
print(
|
|
f" games with evolving canopy: {len(nonzero_deltas)} / {len(rows)} "
|
|
f"(non-zero final delta)", file=out
|
|
)
|
|
if deltas:
|
|
print(f" median final |delta|: {_fmt(statistics.median([abs(d) for d in deltas]))}", file=out)
|
|
|
|
|
|
def print_weather_summary(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> None:
|
|
"""Report weather-event coverage across the batch (p0-36)."""
|
|
totals = [r["_total_weather_events"] for r in rows
|
|
if isinstance(r.get("_total_weather_events"), (int, float))]
|
|
evt_counts = [r.get("evt_weather_event", 0) for r in rows]
|
|
print("weather events (p0-36):", file=out)
|
|
if not totals:
|
|
print(" (no data — batch pre-dates p0-36 instrumentation)", file=out)
|
|
return
|
|
games_with_any = sum(1 for t in totals if t > 0)
|
|
print(
|
|
f" games with >=1 weather_event: {games_with_any} / {len(rows)}",
|
|
file=out
|
|
)
|
|
if totals:
|
|
print(f" median total_weather_events: {_fmt(statistics.median(totals))}", file=out)
|
|
total_evts = sum(evt_counts)
|
|
print(f" total weather_event records (events.jsonl): {total_evts}", file=out)
|
|
|
|
|
|
def print_summary(rows: list[dict[str, Any]], out: Any = sys.stderr) -> None:
|
|
print("=== autoplay batch report ===", file=out)
|
|
print(f"games: {len(rows)}", file=out)
|
|
counts: dict[str, int] = {}
|
|
for r in rows:
|
|
counts[r["outcome"]] = counts.get(r["outcome"], 0) + 1
|
|
for k, v in sorted(counts.items()):
|
|
pct = 100 * v // len(rows) if rows else 0
|
|
print(f" {k}: {v} ({pct}%)", file=out)
|
|
if rows:
|
|
print(
|
|
f"median turns_played: {median_int([r['turns_played'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median p0_pop_peak: {median_int([r['p0_pop_peak'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median p0_gold_peak: {median_int([r['p0_gold_peak'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median agg_total_combats: {median_int([r['agg_total_combats'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median event_count: {median_int([r['event_count'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print("event counts by type (total across all games):", file=out)
|
|
for et in EVENT_TYPES:
|
|
total = sum(r.get(f"evt_{et}", 0) for r in rows)
|
|
if total > 0:
|
|
print(f" {et}: {total}", file=out)
|
|
total_v = sum(r["invariant_violations"] for r in rows)
|
|
print(f"invariant violations (total): {total_v}", file=out)
|
|
print_quality_metrics(rows, out=out)
|
|
print_canopy_summary(rows, out=out)
|
|
print_weather_summary(rows, out=out)
|
|
print_personality_summary(rows, out=out)
|
|
render_per_clan_table(rows, out=out)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
positional: list[str] = []
|
|
flags: set[str] = set()
|
|
i = 1
|
|
while i < len(argv):
|
|
a = argv[i]
|
|
if a.startswith("-"):
|
|
flags.add(a)
|
|
else:
|
|
positional.append(a)
|
|
i += 1
|
|
|
|
if not positional:
|
|
print(
|
|
"usage: autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
results_dir = Path(positional[0])
|
|
deep = "--deep" in flags
|
|
|
|
if not results_dir.is_dir():
|
|
print(f"ERROR: {results_dir} is not a directory", file=sys.stderr)
|
|
return 2
|
|
|
|
found, missing = find_game_dirs(results_dir)
|
|
if not found and not missing:
|
|
print(f"ERROR: No game_*_seed*/ dirs found under {results_dir}", file=sys.stderr)
|
|
return 1
|
|
|
|
ts_schema = load_schema(TURN_STATS_SCHEMA_NAME)
|
|
meta_schema = load_schema(META_SCHEMA_NAME)
|
|
rows: list[dict[str, Any]] = []
|
|
schema_errors: dict[Path, list[str]] = {}
|
|
|
|
for seed, game_dir in found:
|
|
meta_path = game_dir / "meta.json"
|
|
turn_stats_path = game_dir / "turn_stats.jsonl"
|
|
events_path = game_dir / "events.jsonl"
|
|
|
|
# Validate meta.json
|
|
if not meta_path.exists():
|
|
schema_errors[meta_path] = ["meta.json missing"]
|
|
else:
|
|
try:
|
|
meta_data = json.loads(meta_path.read_text())
|
|
meta_errs = validate(meta_data, meta_schema)
|
|
if meta_errs:
|
|
schema_errors[meta_path] = meta_errs
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
schema_errors[meta_path] = [f"cannot load meta.json: {e}"]
|
|
|
|
# Fast path: read only the last line of turn_stats.jsonl
|
|
last_line = _read_last_jsonl_line(turn_stats_path)
|
|
if last_line is None:
|
|
schema_errors[turn_stats_path] = ["turn_stats.jsonl missing or empty"]
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(last_line)
|
|
except json.JSONDecodeError as e:
|
|
schema_errors[turn_stats_path] = [f"last line invalid JSON: {e}"]
|
|
continue
|
|
|
|
errs = validate(data, ts_schema)
|
|
if errs:
|
|
schema_errors[turn_stats_path] = errs
|
|
continue
|
|
|
|
event_counts = _count_events_by_type(events_path) if events_path.exists() else {}
|
|
player_clans = _load_player_clans(meta_path) if meta_path.exists() else {}
|
|
snapshots = _stats_at_turn(turn_stats_path, PER_CLAN_SNAPSHOT_TURNS)
|
|
rows.append(
|
|
extract_row(seed, data, event_counts, player_clans, snapshots)
|
|
)
|
|
|
|
if deep:
|
|
# Read .save files only with --deep
|
|
for save_file in sorted(game_dir.glob("*.save")):
|
|
print(f"[deep] {save_file.name}: {save_file.stat().st_size} bytes", file=sys.stderr)
|
|
|
|
# CSV to stdout — ignore out-of-band `_*` fields used by the per-clan
|
|
# aggregator so the CSV schema stays stable.
|
|
writer = csv.DictWriter(
|
|
sys.stdout, fieldnames=csv_fieldnames(), extrasaction="ignore"
|
|
)
|
|
writer.writeheader()
|
|
for r in rows:
|
|
writer.writerow(r)
|
|
|
|
print_summary(rows)
|
|
failures = run_assertions(rows, missing, schema_errors)
|
|
if failures:
|
|
print("\n=== FAILURES ===", file=sys.stderr)
|
|
for f in failures:
|
|
print(f" {f}", file=sys.stderr)
|
|
return 1
|
|
|
|
if "--update-baseline" in flags:
|
|
print("--update-baseline: not yet implemented (Phase 3b)", file=sys.stderr)
|
|
|
|
print("\nAll assertions passed.", file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv))
|