diff --git a/tools/p1-convergence-lens.py b/tools/p1-convergence-lens.py new file mode 100644 index 00000000..72698e45 --- /dev/null +++ b/tools/p1-convergence-lens.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +"""p1-convergence-lens.py — re-score a batch for p1-29d gate D1 under two +readings of "stalled", measured AT TURN<=100 rather than end-of-game. + +p1-29d gate D1 (convergence): the trailing AI (P1, slot 1) must be +"eliminated <=T100 OR stalled" in 10/10 seeds. The existing +tools/p1-survival-score.py reads mil/kills/cities from the FINAL line only, +and treats "stalled" literally as end-of-game tier_peak<=1. Two problems: + + 1. tier_peak is research-drift, not policy choice (p1-29e F1, p1-29g note): + an inert zombie (mil=0, 1 city, 0 kills, builds nothing) still drifts to + tp 5/6, so the tp<=1 letter fails seeds the objective's own D3 analysis + already calls "non-factor". + 2. The pacing intent is "fate decided BY T100", so P1's state should be + sampled at the last turn <=100, not at end-of-game (T300). + +This scorer prints, per seed, P1's status sampled at the last turn <=100, and +both gate verdicts: + D1-literal : elim<=T100 OR (alive & tier_peak<=1 at T100) + D1-nonfactor : elim<=T100 OR (alive & mil==0 & cities<=1 & kills==0 at T100) + +stdlib only. Usage: p1-convergence-lens.py +""" +from __future__ import annotations + +import json +import re +import sys +from pathlib import Path + +TRAILING_SLOT = "1" +LEADER_SLOT = "0" +T100 = 100 + + +def _player(stats: dict, slot: str) -> dict | None: + if not isinstance(stats, dict): + return None + return stats.get(slot) + + +def scan_seed(game_dir: Path) -> dict | None: + ts = game_dir / "turn_stats.jsonl" + if not ts.exists(): + return None + lines = [ln for ln in ts.read_text().splitlines() if ln.strip()] + if not lines: + return None + + m = re.search(r"seed(\d+)", game_dir.name) + seed = int(m.group(1)) if m else -1 + + records = [] + for ln in lines: + try: + records.append(json.loads(ln)) + except json.JSONDecodeError: + continue + + last = records[-1] + final_turn = last.get("turn") + outcome = last.get("outcome") + + # End-of-game P1 snapshot. + p1_end = _player(last.get("player_stats", {}), TRAILING_SLOT) or {} + p1_cities_end = p1_end.get("cities", 0) or 0 + p1_alive_end = p1_cities_end >= 1 + + # Elimination turn: first turn cities hit 0 and stay 0. + p1_elim_turn = None + for rec in records: + p1t = _player(rec.get("player_stats", {}), TRAILING_SLOT) + if p1t is None: + continue + turn = rec.get("turn") + cities = p1t.get("cities", 0) or 0 + if cities == 0 and not p1t.get("cities_founded_pending"): + if p1_elim_turn is None: + p1_elim_turn = turn + else: + p1_elim_turn = None + if p1_alive_end: + p1_elim_turn = None + + # P1 snapshot at the LAST record with turn <= 100. + at100 = None + for rec in records: + turn = rec.get("turn") + if isinstance(turn, int) and turn <= T100: + p1t = _player(rec.get("player_stats", {}), TRAILING_SLOT) + if p1t is not None: + at100 = (turn, p1t) + snap_turn = at100[0] if at100 else None + snap = at100[1] if at100 else {} + + return { + "seed": seed, + "final_turn": final_turn, + "outcome": outcome, + "p1_elim_turn": p1_elim_turn, + "p1_alive_end": p1_alive_end, + "p1_tp_end": p1_end.get("tier_peak", 0) or 0, + # at-T100 sample: + "snap_turn": snap_turn, + "p1_cities_100": snap.get("cities", 0) or 0, + "p1_mil_100": snap.get("mil", 0) or 0, + "p1_kills_100": snap.get("kills", 0) or 0, + "p1_tp_100": snap.get("tier_peak", 0) or 0, + "p1_pop_100": snap.get("pop", 0) or 0, + } + + +def main(argv: list[str]) -> int: + if len(argv) != 2: + print(__doc__) + return 2 + root = Path(argv[1]) + if not root.exists(): + print(f"no such dir: {root}", file=sys.stderr) + return 2 + + def _seed_of(d: Path) -> int: + m = re.search(r"seed(\d+)", d.name) + return int(m.group(1)) if m else -1 + + game_dirs = sorted( + {p.parent for p in root.rglob("turn_stats.jsonl")}, key=_seed_of) + rows = [r for d in game_dirs if (r := scan_seed(d))] + if not rows: + print(f"no parseable seeds under {root}", file=sys.stderr) + return 1 + rows.sort(key=lambda r: r["seed"]) + + print(f"\n=== p1-29d D1 convergence re-score (state @ turn<=100): {root} ===") + print(f"{len(rows)} seeds\n") + hdr = ("seed", "endT", "elimT", "aliveEnd", "@T", "c@100", + "mil@100", "kil@100", "tp@100", "tpEnd") + print("{:>4} {:>5} {:>5} {:>8} {:>4} {:>5} {:>7} {:>7} {:>6} {:>5}".format(*hdr)) + for r in rows: + print("{:>4} {:>5} {:>5} {:>8} {:>4} {:>5} {:>7} {:>7} {:>6} {:>5}".format( + r["seed"], str(r["final_turn"]), + str(r["p1_elim_turn"]) if r["p1_elim_turn"] is not None else "-", + "yes" if r["p1_alive_end"] else "no", + str(r["snap_turn"]), r["p1_cities_100"], r["p1_mil_100"], + r["p1_kills_100"], r["p1_tp_100"], r["p1_tp_end"])) + + n = len(rows) + + def elim_by_t100(r) -> bool: + return (not r["p1_alive_end"]) and ( + r["p1_elim_turn"] is not None and r["p1_elim_turn"] <= T100) + + lit, nf = [], [] + print("\n--- per-seed D1 verdicts ---") + for r in rows: + e100 = elim_by_t100(r) + stalled_lit = (not e100) and r["p1_alive_end"] and r["p1_tp_100"] <= 1 \ + and r["p1_elim_turn"] is None + # alive-at-100 (had a city at the T100 sample) is required for "stalled" + alive_at_100 = r["p1_cities_100"] >= 1 + stalled_nf = (not e100) and alive_at_100 and r["p1_mil_100"] == 0 \ + and r["p1_cities_100"] <= 1 and r["p1_kills_100"] == 0 + ok_lit = e100 or stalled_lit + ok_nf = e100 or stalled_nf + lit.append(ok_lit) + nf.append(ok_nf) + reason = "elim<=T100" if e100 else ( + f"alive@100 c={r['p1_cities_100']} mil={r['p1_mil_100']} " + f"kills={r['p1_kills_100']} tp@100={r['p1_tp_100']}") + print(f" s{r['seed']:<2} literal={'OK' if ok_lit else 'NO'} " + f"nonfactor={'OK' if ok_nf else 'NO'} ({reason})") + + print(f"\nD1-literal (elim<=T100 OR alive&tp@100<=1): " + f"{sum(lit)}/{n} -> {'PASS' if sum(lit)==n else 'FAIL'}") + print(f"D1-nonfactor (elim<=T100 OR alive&mil=0&cit<=1&kills=0 @T100): " + f"{sum(nf)}/{n} -> {'PASS' if sum(nf)==n else 'FAIL'}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv))