The wiring for per-clan trained AI. Each training episode samples a clan, stamps it
on the LEARNER slot so the obs one-hots it, and scales the SHAPING rewards by that
clan's overlay (terminal win/loss stay universal):
- player_api_main.gd: CP_LEARNER_CLAN stamps the learner slot's clan via
set_player_personality_json -> PlayerState.clan_id -> PlayerView.clan_index ->
obs clan one-hot. (Previously only non-learner slots got a clan.)
- reward_overlays.json: per-clan group multipliers (combat/expansion/production/
economy/tech) derived from ai_personalities.json strategic_axes, normalized per
clan to mean 1.0 (no fairness confound). Archetypes emerge: blackhammer combat 1.5,
goldvein economy 1.64, deepforge expansion 0.42.
- magic_civ_env.py: samples the clan per episode (seeded), passes CP_LEARNER_CLAN,
scales the 8 shaping reward terms by self._ov(group).
- harness_client.py: HarnessConfig.learner_clan -> CP_LEARNER_CLAN.
- train.py: --clan ('' generalist | 'all' samples every clan | comma list).
Local checks: py_compile clean; overlays cover all 6 clans. Next: fleet smoke
(clan_index in the learner view + a tiny training run) before scaling out.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
290 lines
12 KiB
Python
290 lines
12 KiB
Python
"""Reusable JSON-Lines client for `scripts/player-api-server.sh`.
|
|
|
|
One client = one subprocess = one game. The Gymnasium env in
|
|
`magic_civ_env.py` owns one of these per `reset()` cycle, and the
|
|
evaluator owns one per evaluation episode. Both share the same
|
|
protocol; pulling it out here lets them stay in sync with the wire
|
|
contract documented in `src/game/engine/docs/PLAYER_API.md`.
|
|
|
|
Strong types throughout — no string-typed errors leaking up from the
|
|
harness. Anything off-protocol raises `HarnessError`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
HARNESS_SCRIPT = REPO_ROOT / "scripts" / "player-api-server.sh"
|
|
|
|
# Max lines we will read while looking for a matching response before
|
|
# giving up. Each harness response sits behind 0..N async notifications,
|
|
# so we need a buffer — 5000 is generous enough that even a busy turn
|
|
# with hundreds of `unit_moved`/`turn_started` notifications won't trip
|
|
# it, while still bounded so a wedged harness can't hang the trainer.
|
|
MAX_LINES_PER_RESPONSE = 5000
|
|
|
|
|
|
class HarnessError(RuntimeError):
|
|
"""Raised when the harness violates the protocol or dies unexpectedly."""
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class HarnessConfig:
|
|
"""Per-episode harness configuration. Mirrors the env-var contract in
|
|
`scripts/player-api-server.sh` so callers can override any axis without
|
|
knowing the env-var spelling."""
|
|
|
|
seed: int = 42
|
|
players: int = 2
|
|
player_slot: int = 0
|
|
# Stage 4 (multi-slot adapter) — externally-driven slots when this
|
|
# process is driving more than one slot (e.g. 5 learned slots in a
|
|
# 5v5 FFA). When this tuple has >1 entry, every wire `view` / `act`
|
|
# call MUST include a `slot` field naming which slot it targets;
|
|
# `HarnessClient.view`/`act`/`end_turn` accept an optional `slot`
|
|
# kwarg for this. Defaults to `(player_slot,)` so single-slot
|
|
# callers keep the existing wire shape unchanged.
|
|
player_slots: tuple[int, ...] = ()
|
|
map_size: str = "duel"
|
|
map_type: str = "continents"
|
|
omniscient: bool = False
|
|
timeout_sec: int = 60
|
|
# When set, the simulator's TurnProcessor uses a VictoryConfig that
|
|
# matches this mode (see mc-player-api/src/dispatch.rs
|
|
# `victory_config_from_env`). Default "domination" so RL episodes can
|
|
# terminate via capital capture / LastSurvivor — without this the
|
|
# simulator falls back to a city-count check that almost never fires
|
|
# in 1v1 duel play.
|
|
victory_mode: str = "domination"
|
|
# Stage 4 — per-AI-slot controller registry id, comma-joined in slot
|
|
# order over AI slots (i.e. excluding `player_slot`). Empty = every
|
|
# AI slot defaults to `"scripted:default"` (the MCTS+heuristic).
|
|
# Set this to mix learned + scripted opponents in one game, e.g.
|
|
# `("learned:duel-v1b", "", "")` puts learned on the first AI slot.
|
|
player_controllers: tuple[str, ...] = ()
|
|
# Clan-conditioned RL: stamp the LEARNER slot's clan id (an
|
|
# ai_personalities.json key, e.g. "blackhammer") so PlayerState.clan_id
|
|
# projects into PlayerView.clan_index and the observation one-hots it.
|
|
# Empty = generalist (clan_index = -1). See player_api_main.gd
|
|
# CP_LEARNER_CLAN.
|
|
learner_clan: str = ""
|
|
|
|
@property
|
|
def effective_player_slots(self) -> tuple[int, ...]:
|
|
"""Resolve the back-compat fallback: empty tuple → `(player_slot,)`."""
|
|
return self.player_slots if self.player_slots else (self.player_slot,)
|
|
|
|
def to_env(self) -> dict[str, str]:
|
|
slots = self.effective_player_slots
|
|
env: dict[str, str] = {
|
|
"CP_SEED": str(self.seed),
|
|
"CP_PLAYERS": str(self.players),
|
|
"CP_PLAYER_SLOT": str(slots[0]),
|
|
"CP_PLAYER_SLOTS": ",".join(str(s) for s in slots),
|
|
"CP_MAP_SIZE": self.map_size,
|
|
"CP_MAP_TYPE": self.map_type,
|
|
"CP_OMNISCIENT": "1" if self.omniscient else "0",
|
|
"CP_TIMEOUT_SEC": str(self.timeout_sec),
|
|
"CP_VICTORY_MODE": self.victory_mode,
|
|
}
|
|
if self.player_controllers:
|
|
env["CP_PLAYER_CONTROLLERS"] = ",".join(self.player_controllers)
|
|
if self.learner_clan:
|
|
env["CP_LEARNER_CLAN"] = self.learner_clan
|
|
return env
|
|
|
|
|
|
class HarnessClient:
|
|
"""One running harness instance. Cheap to construct (sub-second on
|
|
macOS once Godot's class cache is warm); destroy + recreate on each
|
|
Gym `reset()` so episodes have independent simulator state."""
|
|
|
|
def __init__(self, config: HarnessConfig | None = None) -> None:
|
|
self._config = config or HarnessConfig()
|
|
env = {**os.environ, **self._config.to_env()}
|
|
# Run Godot as a DIRECT child (no per-process systemd scope) and put
|
|
# the whole harness in its own process GROUP, so shutdown() can reap
|
|
# the entire tree (bash → flatpak → godot) with os.killpg. The default
|
|
# slice/scope path orphans the Godot into a separate scope cgroup that
|
|
# the client cannot reap by killing the script PID — leaking one Godot
|
|
# per episode reset (gen0 contention death, 2026-06-09). The RL trainer
|
|
# contains resources at its own systemd unit level instead (the train
|
|
# service's CPUWeight/MemoryHigh still gives sshd preemption). Only
|
|
# this RL client opts in; interactive/batch callers keep the slice.
|
|
env["CP_NO_SLICE"] = "1"
|
|
# Capture the Godot subprocess stderr to a file when
|
|
# MC_HARNESS_STDERR_DIR is set — otherwise DEVNULL (the shipping
|
|
# default). Without this, a Godot boot/timeout failure under load
|
|
# leaves only an opaque "stdout EOF" on the Python side with no
|
|
# reason. Set MC_HARNESS_STDERR_DIR=<dir> for any long training run
|
|
# so a harness death arrives diagnosed, not as a guess.
|
|
self._stderr_file = None
|
|
stderr_dir = os.environ.get("MC_HARNESS_STDERR_DIR", "")
|
|
if stderr_dir:
|
|
os.makedirs(stderr_dir, exist_ok=True)
|
|
self._stderr_file = open(
|
|
os.path.join(stderr_dir, f"harness_{os.getpid()}_{time.time_ns()}.err"),
|
|
"w",
|
|
)
|
|
stderr_target = self._stderr_file
|
|
else:
|
|
stderr_target = subprocess.DEVNULL
|
|
self._proc = subprocess.Popen(
|
|
["bash", str(HARNESS_SCRIPT)],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=stderr_target,
|
|
cwd=str(REPO_ROOT),
|
|
text=True,
|
|
bufsize=1,
|
|
# Own process group → shutdown() reaps bash + flatpak + godot via
|
|
# os.killpg, so no Godot is left orphaned on respawn.
|
|
start_new_session=True,
|
|
env=env,
|
|
)
|
|
self._next_id = 1
|
|
self._closed = False
|
|
# Async notifications that arrived while we were waiting for a
|
|
# correlated response. The wire spec sends notifications without
|
|
# an `id`; they carry the same Event payload as response.events
|
|
# but fire for things outside our request (AI turns, opponent
|
|
# eliminations, GameOver while we're mid-act). Drained by
|
|
# `drain_notifications()` after each act/view.
|
|
self._pending_notifications: list[dict[str, Any]] = []
|
|
|
|
@property
|
|
def config(self) -> HarnessConfig:
|
|
return self._config
|
|
|
|
def _send(self, msg: dict[str, Any]) -> dict[str, Any]:
|
|
if self._closed:
|
|
raise HarnessError("harness already closed")
|
|
msg["id"] = self._next_id
|
|
self._next_id += 1
|
|
assert self._proc.stdin is not None and self._proc.stdout is not None
|
|
self._proc.stdin.write(json.dumps(msg) + "\n")
|
|
self._proc.stdin.flush()
|
|
for _ in range(MAX_LINES_PER_RESPONSE):
|
|
line = self._proc.stdout.readline()
|
|
if not line:
|
|
self._closed = True
|
|
raise HarnessError(
|
|
f"harness stdout EOF while waiting for id={msg['id']}"
|
|
)
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
# Stray stderr on stdout — skip and keep reading.
|
|
continue
|
|
if obj.get("id") == msg["id"]:
|
|
return obj
|
|
if "id" not in obj:
|
|
# Async notification — buffer for the next drain.
|
|
self._pending_notifications.append(obj)
|
|
raise HarnessError(
|
|
f"no correlated response for id={msg['id']} within {MAX_LINES_PER_RESPONSE} lines"
|
|
)
|
|
|
|
def view(self, slot: int | None = None) -> dict[str, Any]:
|
|
msg: dict[str, Any] = {"type": "view"}
|
|
if slot is not None:
|
|
msg["slot"] = slot
|
|
r = self._send(msg)
|
|
if not r.get("ok"):
|
|
raise HarnessError(f"view failed: {r.get('error')}")
|
|
return r["view"]
|
|
|
|
def act(
|
|
self, action: dict[str, Any], slot: int | None = None
|
|
) -> dict[str, Any]:
|
|
msg: dict[str, Any] = {"type": "act", "action": action}
|
|
if slot is not None:
|
|
msg["slot"] = slot
|
|
r = self._send(msg)
|
|
if not r.get("ok"):
|
|
err = r.get("error", {})
|
|
raise HarnessError(
|
|
f"act({action.get('type')!r}, slot={slot}) failed: "
|
|
f"{err.get('code')}: {err.get('message')}"
|
|
)
|
|
return r
|
|
|
|
def end_turn(self, slot: int | None = None) -> dict[str, Any]:
|
|
return self.act({"type": "end_turn"}, slot=slot)
|
|
|
|
def suggest(self, slot: int | None = None) -> list[dict[str, Any]]:
|
|
"""Ask the harness what the scripted controller would play for
|
|
`slot` this turn, WITHOUT applying anything or advancing the turn.
|
|
|
|
Backs the behavioural-cloning recorder (`record_expert.py`): the
|
|
returned `actions` list uses the same `PlayerAction` JSON shape
|
|
`act()` accepts, so the recorder replays each action straight
|
|
back. Read-only on the wire — two `suggest` calls in a row return
|
|
identical results and leave `view()` unchanged.
|
|
"""
|
|
msg: dict[str, Any] = {"type": "suggest"}
|
|
if slot is not None:
|
|
msg["slot"] = slot
|
|
r = self._send(msg)
|
|
if not r.get("ok"):
|
|
err = r.get("error", {})
|
|
raise HarnessError(
|
|
f"suggest(slot={slot}) failed: "
|
|
f"{err.get('code')}: {err.get('message')}"
|
|
)
|
|
return list(r.get("actions", []))
|
|
|
|
def drain_notifications(self) -> list[dict[str, Any]]:
|
|
"""Pop and return all async notifications that arrived since the
|
|
last drain. Each entry is the raw wire object; callers typically
|
|
extract `Notification::Event` payloads by looking at the `type`
|
|
field and the carried event tag."""
|
|
drained = self._pending_notifications
|
|
self._pending_notifications = []
|
|
return drained
|
|
|
|
def shutdown(self) -> None:
|
|
if self._closed:
|
|
return
|
|
self._closed = True
|
|
try:
|
|
self._send({"type": "shutdown"})
|
|
except HarnessError:
|
|
pass
|
|
try:
|
|
self._proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
# Reap the ENTIRE process group (bash → flatpak → godot), not just the
|
|
# script PID. With `start_new_session=True` the harness is its own
|
|
# group leader (pgid == self._proc.pid); SIGKILL to the group
|
|
# guarantees the Godot child cannot survive as an orphan — the leak
|
|
# that sank gen0. Best-effort: the group may already be gone after a
|
|
# graceful wire-shutdown.
|
|
try:
|
|
os.killpg(self._proc.pid, signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError):
|
|
pass
|
|
try:
|
|
self._proc.wait(timeout=2)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
if self._stderr_file is not None:
|
|
try:
|
|
self._stderr_file.close()
|
|
except Exception:
|
|
pass
|
|
self._stderr_file = None
|
|
|
|
def __enter__(self) -> HarnessClient:
|
|
return self
|
|
|
|
def __exit__(self, *exc: object) -> None:
|
|
self.shutdown()
|