293 lines
10 KiB
Python
293 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""Cross-reference validator for the resource graph (Gap A from
|
||
plans/what-are-all-the-gleaming-flurry.md).
|
||
|
||
Walks the Game 1 Age-of-Dwarves data pack and asserts the deposit ↔ unit/building
|
||
↔ tech graph is internally consistent. Designed to be run in CI.
|
||
|
||
Modes:
|
||
--report (default) Print every issue, exit 0. Use during cleanup.
|
||
--strict Exit non-zero on any error. Use in CI after cleanup.
|
||
|
||
Checks:
|
||
M1 Manifest IDs must each have a matching deposit file.
|
||
D1 Deposit gates_units / gates_buildings entries must reference real units/buildings.
|
||
D2 Bidirectional: if deposit D lists unit U in gates_units, U.requires_resource == D.
|
||
U1 Every unit/building with requires_resource: R → R must be in the manifest.
|
||
U2 Every unit/building with requires_resource: R → file R.json must exist with the
|
||
reverse back-pointer (this unit/building must appear in R's gates_* list).
|
||
T1 Tech reachability: for unit U with tech_required: T and requires_resource: R,
|
||
depth(R.yield_gate) <= depth(T). I.e. by the time you unlock U, R is usable.
|
||
S1 Game-1 scope: flag deposits whose name or description carries strong magic-school
|
||
markers when they appear in the Game-1 manifest. Warning, not error.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
from collections import deque
|
||
from pathlib import Path
|
||
|
||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||
DEPOSITS_DIR = REPO_ROOT / "public" / "resources" / "deposits"
|
||
UNITS_DIR = REPO_ROOT / "public" / "resources" / "units"
|
||
BUILDINGS_DIR = REPO_ROOT / "public" / "resources" / "buildings"
|
||
# The Age-of-Dwarves tech tree is split across two source dirs: the shared
|
||
# `resources/techs/` and the game-specific `games/age-of-dwarves/data/techs/`.
|
||
# The runtime DataLoader (and validate-game-data.py) read BOTH; this validator
|
||
# must too, or it cannot resolve tech_required values authored in the
|
||
# game-specific dir (gunpowder, rifling, armoured_warfare, …) and falsely
|
||
# reports their units' tech depth as uncomputable.
|
||
TECHS_DIRS = (
|
||
REPO_ROOT / "public" / "resources" / "techs",
|
||
REPO_ROOT / "public" / "games" / "age-of-dwarves" / "data" / "techs",
|
||
)
|
||
MANIFEST = (
|
||
REPO_ROOT
|
||
/ "public"
|
||
/ "games"
|
||
/ "age-of-dwarves"
|
||
/ "data"
|
||
/ "deposits"
|
||
/ "manifest.json"
|
||
)
|
||
|
||
MAGIC_NAME_MARKERS = (
|
||
"magesteel",
|
||
"deep_crystal",
|
||
"dragon_bone",
|
||
"mithril",
|
||
"chaos_",
|
||
"ghost_orchid",
|
||
"sacred_grove",
|
||
"sunstone",
|
||
"merfolk",
|
||
"leviathan",
|
||
"kraken",
|
||
"sea_serpent",
|
||
)
|
||
|
||
|
||
class Report:
|
||
def __init__(self) -> None:
|
||
self.errors: list[str] = []
|
||
self.warnings: list[str] = []
|
||
|
||
def err(self, code: str, msg: str) -> None:
|
||
self.errors.append(f"[{code}] {msg}")
|
||
|
||
def warn(self, code: str, msg: str) -> None:
|
||
self.warnings.append(f"[{code}] {msg}")
|
||
|
||
def print(self) -> None:
|
||
for line in self.errors:
|
||
print(f"ERROR {line}")
|
||
for line in self.warnings:
|
||
print(f"WARN {line}")
|
||
print(
|
||
f"\n{len(self.errors)} errors, {len(self.warnings)} warnings"
|
||
)
|
||
|
||
|
||
def _load_array(path: Path) -> list[dict]:
|
||
with path.open() as f:
|
||
data = json.load(f)
|
||
if isinstance(data, list):
|
||
return data
|
||
if isinstance(data, dict):
|
||
return [data]
|
||
raise ValueError(f"{path} is neither list nor object")
|
||
|
||
|
||
def _load_manifest(path: Path) -> list[str]:
|
||
with path.open() as f:
|
||
data = json.load(f)
|
||
includes = data.get("includes", [])
|
||
if not isinstance(includes, list):
|
||
raise ValueError(f"{path}: 'includes' is not a list")
|
||
return [str(x) for x in includes]
|
||
|
||
|
||
def _scan_dir(directory: Path) -> dict[str, dict]:
|
||
"""Return id -> entry for every entry in every .json file in the directory.
|
||
Skips schema files and category-definition files."""
|
||
out: dict[str, dict] = {}
|
||
if not directory.is_dir():
|
||
return out
|
||
for path in sorted(directory.iterdir()):
|
||
if path.suffix != ".json":
|
||
continue
|
||
if path.name.endswith(".schema.json"):
|
||
continue
|
||
if "categories" in path.name or path.name in {
|
||
"improvements.json",
|
||
"resources.json",
|
||
"episodes.json",
|
||
}:
|
||
continue
|
||
try:
|
||
for entry in _load_array(path):
|
||
if isinstance(entry, dict) and "id" in entry:
|
||
out[entry["id"]] = entry
|
||
except Exception as exc:
|
||
print(f"WARN failed to read {path}: {exc}", file=sys.stderr)
|
||
return out
|
||
|
||
|
||
def _build_tech_depth(techs: dict[str, dict]) -> dict[str, int]:
|
||
"""BFS from techs with no `requires` to give each tech an integer depth."""
|
||
depth: dict[str, int] = {}
|
||
queue: deque[str] = deque()
|
||
for tid, t in techs.items():
|
||
if not t.get("requires"):
|
||
depth[tid] = 0
|
||
queue.append(tid)
|
||
while queue:
|
||
tid = queue.popleft()
|
||
d = depth[tid]
|
||
for other_id, other in techs.items():
|
||
if other_id in depth:
|
||
continue
|
||
reqs = other.get("requires", []) or []
|
||
if all(r in depth for r in reqs):
|
||
depth[other_id] = max((depth[r] for r in reqs), default=0) + 1
|
||
queue.append(other_id)
|
||
# Anything left got pruned by a cycle or missing dep.
|
||
return depth
|
||
|
||
|
||
def validate(rep: Report) -> None:
|
||
manifest = _load_manifest(MANIFEST)
|
||
manifest_set = set(manifest)
|
||
|
||
deposits = _scan_dir(DEPOSITS_DIR)
|
||
units = _scan_dir(UNITS_DIR)
|
||
buildings = _scan_dir(BUILDINGS_DIR)
|
||
techs: dict[str, dict] = {}
|
||
for tdir in TECHS_DIRS:
|
||
techs.update(_scan_dir(tdir))
|
||
tech_depth = _build_tech_depth(techs)
|
||
|
||
# M1 — manifest IDs each have a matching deposit file.
|
||
for did in manifest:
|
||
if did not in deposits:
|
||
rep.err("M1", f"manifest lists '{did}' but no deposits/{did}.json exists")
|
||
|
||
# D1 + D2 — deposits' gates_* point at real units/buildings with reciprocal requires_resource.
|
||
for did, dep in deposits.items():
|
||
if did not in manifest_set:
|
||
continue # only validate deposits that ship in Game 1.
|
||
for uid in dep.get("gates_units", []) or []:
|
||
unit = units.get(uid)
|
||
if not unit:
|
||
rep.err("D1", f"deposit '{did}'.gates_units lists '{uid}' but no unit found")
|
||
continue
|
||
if unit.get("requires_resource") != did:
|
||
rep.err(
|
||
"D2",
|
||
f"deposit '{did}' says it gates unit '{uid}', but {uid}.requires_resource = "
|
||
f"{unit.get('requires_resource')!r}",
|
||
)
|
||
for bid in dep.get("gates_buildings", []) or []:
|
||
bld = buildings.get(bid)
|
||
if not bld:
|
||
rep.err("D1", f"deposit '{did}'.gates_buildings lists '{bid}' but no building found")
|
||
continue
|
||
if bld.get("requires_resource") != did:
|
||
rep.err(
|
||
"D2",
|
||
f"deposit '{did}' says it gates building '{bid}', but {bid}.requires_resource = "
|
||
f"{bld.get('requires_resource')!r}",
|
||
)
|
||
|
||
# U1 + U2 — units/buildings with requires_resource point at a manifest deposit
|
||
# whose gates_* list contains them.
|
||
for kind, table in (("unit", units), ("building", buildings)):
|
||
for iid, entry in table.items():
|
||
rres = entry.get("requires_resource")
|
||
if not rres:
|
||
continue
|
||
if rres not in manifest_set:
|
||
rep.err(
|
||
"U1",
|
||
f"{kind} '{iid}' requires resource '{rres}' but '{rres}' is not in the Game 1 manifest",
|
||
)
|
||
continue
|
||
dep = deposits.get(rres)
|
||
if not dep:
|
||
continue # already flagged by M1.
|
||
back = dep.get(
|
||
"gates_units" if kind == "unit" else "gates_buildings"
|
||
) or []
|
||
if iid not in back:
|
||
rep.err(
|
||
"U2",
|
||
f"{kind} '{iid}'.requires_resource = '{rres}' but '{rres}'.gates_{kind}s does not contain '{iid}'",
|
||
)
|
||
|
||
# T1 — tech reachability.
|
||
for kind, table in (("unit", units), ("building", buildings)):
|
||
for iid, entry in table.items():
|
||
rres = entry.get("requires_resource")
|
||
if not rres or rres not in deposits:
|
||
continue
|
||
t_unlock = entry.get("tech_required")
|
||
yield_gate = deposits[rres].get("yield_gate")
|
||
if not t_unlock or not yield_gate:
|
||
continue # one or both unconstrained → fine.
|
||
du = tech_depth.get(t_unlock)
|
||
dr = tech_depth.get(yield_gate)
|
||
if du is None or dr is None:
|
||
rep.warn(
|
||
"T1",
|
||
f"{kind} '{iid}': cannot compute tech depth for "
|
||
f"unlock={t_unlock!r} or yield_gate={yield_gate!r}",
|
||
)
|
||
continue
|
||
if dr > du:
|
||
rep.err(
|
||
"T1",
|
||
f"{kind} '{iid}' unlocks at tech '{t_unlock}' (depth {du}) but its "
|
||
f"resource '{rres}' needs yield_gate '{yield_gate}' (depth {dr}) — "
|
||
f"player unlocks the {kind} before they can use the resource",
|
||
)
|
||
|
||
# S1 — flag magic-flavor deposit names that ship in Game 1 but are still
|
||
# authored as Game-2 content. A magic-marker name on a deposit explicitly
|
||
# scoped `game_1` is intentional (the scope doc's deliberate "T8–T10
|
||
# mundane Game-2-teaser content in Game 1", e.g. mithril_vein). Only warn
|
||
# when the deposit either carries no Game-1 scope or is still tagged
|
||
# `game_2` — those are genuine unreviewed leaks.
|
||
for did in manifest:
|
||
if not any(marker in did for marker in MAGIC_NAME_MARKERS):
|
||
continue
|
||
dep = deposits.get(did)
|
||
if dep is not None and dep.get("scope") == "game_1":
|
||
continue
|
||
rep.warn(
|
||
"S1",
|
||
f"manifest entry '{did}' has a magic-school name marker; should it ship in Game 1?",
|
||
)
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(description=__doc__)
|
||
ap.add_argument(
|
||
"--strict",
|
||
action="store_true",
|
||
help="Exit non-zero on any error. Default: report and exit 0.",
|
||
)
|
||
args = ap.parse_args()
|
||
|
||
rep = Report()
|
||
validate(rep)
|
||
rep.print()
|
||
if args.strict and rep.errors:
|
||
return 1
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|