From 6014ec0707ea2b53084083f716c01484269ea420 Mon Sep 17 00:00:00 2001 From: "qianrui.mmmy" Date: Mon, 11 May 2026 19:53:40 +0800 Subject: [PATCH] feat: add human http agent --- texas_holdem/agents.py | 95 +++++++++++++++++++++++--- texas_holdem/engine.py | 40 ++++++++++- texas_holdem/human_client.py | 126 +++++++++++++++++++++++++++-------- texas_holdem/human_io.py | 107 ++++++++++++++++++++++++++++- texas_holdem/models.py | 8 +++ 5 files changed, 335 insertions(+), 41 deletions(-) diff --git a/texas_holdem/agents.py b/texas_holdem/agents.py index 310be19..e8592d1 100644 --- a/texas_holdem/agents.py +++ b/texas_holdem/agents.py @@ -8,7 +8,7 @@ from typing import IO, Any from urllib.error import URLError from urllib.request import Request, urlopen -from texas_holdem.human_io import prompt_action, render_observation +from texas_holdem.human_io import clear_screen, prompt_action, render_observation from texas_holdem.models import Observation, PlayerAction @@ -17,6 +17,16 @@ class PokerAgent(ABC): def decide(self, observation: Observation) -> PlayerAction: raise NotImplementedError + def on_game_update(self, game_state: dict[str, Any]) -> None: + """Optional hook called after every finished hand. + + ``game_state`` is the same dict produced by :meth:`TableGame.to_dict`, + carrying the full ``hands`` history (including any showdown hole + cards). Default implementation is a no-op so most agents (random, + calling, in-process humans) need not care about it. + """ + return None + class RandomAgent(PokerAgent): def __init__(self, rng: Random | None = None) -> None: @@ -45,26 +55,83 @@ class CallingStationAgent(PokerAgent): class HttpAgent(PokerAgent): + """Remote agent that talks to a base URL exposing ``/act`` and ``/game``. + + The constructor takes a *base* URL (e.g. ``http://host:9001``) and + derives the per-purpose endpoints internally. This keeps the wire layout + a server-side concern and lets us evolve the protocol (add ``/init``, + ``/end``, ...) without touching every game spec. + """ + + ACT_PATH = "/act" + GAME_PATH = "/game" + def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None: - self.endpoint = endpoint + self.base_url = self._normalise_base_url(endpoint) self.timeout_seconds = timeout_seconds + @staticmethod + def _normalise_base_url(raw: str) -> str: + """Strip a trailing slash so URL joins do not produce double slashes. + + Centralising this also tolerates the legacy "endpoint already points + at /act" mistake by chopping off a redundant ``/act`` suffix. + """ + url = raw.rstrip("/") + if url.endswith("/act"): + url = url[: -len("/act")] + return url + + def _url(self, path: str) -> str: + """Compose a full URL by joining the base with a path component.""" + return f"{self.base_url}{path}" + def decide(self, observation: Observation) -> PlayerAction: - body = json.dumps(observation.to_dict()).encode("utf-8") + payload = self._post_json(self.ACT_PATH, observation.to_dict()) + if not isinstance(payload, dict): + raise RuntimeError("agent endpoint must return a JSON object") + return PlayerAction.from_dict(payload) + + def on_game_update(self, game_state: dict[str, Any]) -> None: + """Push the post-hand snapshot to the remote ``/game`` endpoint. + + We swallow failures (the engine's broadcaster also catches them) so + a single offline client cannot stall the table; failure is logged + only by way of the raised exception bubbling to the engine guard. + """ + try: + self._post_json(self.GAME_PATH, game_state) + except RuntimeError: + # ``/game`` is informational; treat any HTTP error as a benign + # drop rather than reraising and aborting the hand loop. + return None + + def _post_json(self, path: str, payload: dict[str, Any]) -> Any: + """POST ``payload`` as JSON to ``base_url + path`` and return parsed body. + + Extracted as a tiny helper so ``decide`` and ``on_game_update`` share + identical transport semantics (timeout, error wrapping, content-type). + """ + body = json.dumps(payload).encode("utf-8") request = Request( - self.endpoint, + self._url(path), data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urlopen(request, timeout=self.timeout_seconds) as response: - payload: Any = json.loads(response.read().decode("utf-8")) - except (OSError, URLError, json.JSONDecodeError) as exc: - raise RuntimeError(f"agent endpoint failed: {self.endpoint}") from exc - if not isinstance(payload, dict): - raise RuntimeError("agent endpoint must return a JSON object") - return PlayerAction.from_dict(payload) + raw = response.read().decode("utf-8") + except (OSError, URLError) as exc: + raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc + if not raw: + return None + try: + return json.loads(raw) + except json.JSONDecodeError as exc: + raise RuntimeError( + f"agent endpoint returned invalid JSON: {self._url(path)}" + ) from exc class HumanAgent(PokerAgent): @@ -74,19 +141,27 @@ class HumanAgent(PokerAgent): drives an interactive menu so the operator can only emit legal actions. Streams are injected to keep the agent testable and to allow alternate consoles in the future (e.g. piping to a debug log). + + By default the terminal is wiped at the start of each ``decide`` call so + every turn appears on a clean screen. Set ``keep_history=True`` to + preserve the scrollback (useful when piping to a log file). """ def __init__( self, input_stream: IO[str] | None = None, output_stream: IO[str] | None = None, + keep_history: bool = False, ) -> None: self._input = input_stream if input_stream is not None else sys.stdin self._output = output_stream if output_stream is not None else sys.stdout + self._keep_history = keep_history def decide(self, observation: Observation) -> PlayerAction: # Convert to dict-form so the rendering/prompting code path is shared # with the standalone HTTP human client (see texas_holdem.human_io). + if not self._keep_history: + clear_screen(self._write) obs_dict = observation.to_dict() self._write(render_observation(obs_dict)) chosen = prompt_action( diff --git a/texas_holdem/engine.py b/texas_holdem/engine.py index 1dc91c0..16eb7ef 100644 --- a/texas_holdem/engine.py +++ b/texas_holdem/engine.py @@ -110,10 +110,14 @@ class TableGame: board=list(self.board), actions=list(self.action_history), awards=awards, + showdown_hands=self._collect_showdown_hands(), started_at=started_at, finished_at=time(), ) self.hand_summaries.append(summary) + # Notify every agent so HTTP-backed clients can render the just + # finished hand. Failures here must never abort the table. + self._broadcast_game_update() return summary def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]: @@ -140,7 +144,10 @@ class TableGame: "big_blind": self.big_blind, "starting_stack": self.starting_stack, "players": [player.public_dict() for player in self.players], - "last_hand": self.hand_summaries[-1].to_dict() if self.hand_summaries else None, + # ``hands`` exposes every finished hand (each entry is the same + # dict that was previously returned as ``last_hand``). Callers + # that only want the most recent one can do ``hands[-1]``. + "hands": [summary.to_dict() for summary in self.hand_summaries], } def _advance_button(self) -> None: @@ -448,6 +455,37 @@ class TableGame: ) return awards + def _collect_showdown_hands(self) -> dict[str, list]: + """Snapshot hole cards of every player still eligible at showdown. + + We treat a hand as having reached showdown iff at least two players + remain ``in_hand`` and unfolded after the river. Returning an empty + dict for the one-player-left case keeps the wire format compact and + avoids leaking hole cards when there was no real comparison. + """ + live_players = [player for player in self.players if self._is_live(player)] + if len(live_players) < 2: + return {} + return { + player.player_id: list(player.hole_cards) for player in live_players + } + + def _broadcast_game_update(self) -> None: + """Push the post-hand game snapshot to every agent's optional hook. + + Agents may opt into receiving game updates by overriding + :meth:`PokerAgent.on_game_update`. The default implementation is a + no-op, so this loop is essentially free for non-HTTP agents. We + swallow individual exceptions so a flaky remote endpoint cannot + break the table flow. + """ + snapshot = self.to_dict() + for agent in self.agents.values(): + try: + agent.on_game_update(snapshot) + except Exception: + continue + def _record_action( self, player: PlayerState, diff --git a/texas_holdem/human_client.py b/texas_holdem/human_client.py index ef05d77..1133507 100644 --- a/texas_holdem/human_client.py +++ b/texas_holdem/human_client.py @@ -1,33 +1,37 @@ """Standalone interactive HTTP Human Agent. -Run this as a process on the operator's machine to expose a single -``POST /act`` endpoint that the Texas Hold'em service can call when it is -that operator's turn to act: +Run this as a process on the operator's machine to expose: + +* ``POST /act`` - the server posts the current observation; we render it + on the local terminal and block on stdin until the human picks a legal + action, then return ``{"action": ..., "amount": N}``. +* ``POST /game`` - the server posts the full game snapshot at the end of + every hand (same shape as ``GET /games/{id}``) so the operator sees how + the table is evolving. The body of the response is empty. +* ``GET /health`` - liveness probe. + +Start the client: python -m texas_holdem.human_client --host 127.0.0.1 --port 9001 -Then create a game on the server with this player spec:: +Hook it up by passing the *base* URL when creating the game:: { "id": "alice", "name": "Alice", "agent": { "type": "http", - "endpoint": "http://127.0.0.1:9001/act", + "endpoint": "http://127.0.0.1:9001", "timeout_seconds": 600 } } -Every time the server posts an observation, this client renders it on the -local terminal and blocks on stdin until the human chooses a legal action, -then returns ``{"action": "...", "amount": N}`` as JSON. - Design notes: - The HTTP layer reuses :mod:`texas_holdem.human_io` so rendering and menu validation stay consistent with the in-process :class:`HumanAgent`. -- A module-level :class:`threading.Lock` serialises terminal access. This is - necessary because the (rare) case of multiple overlapping requests from - the server must not interleave prompts on the same TTY. +- A :class:`threading.Lock` inside :class:`HumanClientConsole` serialises + terminal access so concurrent ``/act`` and ``/game`` callbacks never + interleave on the same TTY. """ from __future__ import annotations @@ -40,7 +44,12 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from threading import Lock from typing import IO, Any -from texas_holdem.human_io import prompt_action, render_observation +from texas_holdem.human_io import ( + clear_screen, + prompt_action, + render_game_state, + render_observation, +) class HumanClientConsole: @@ -49,23 +58,32 @@ class HumanClientConsole: Wrapping the streams in a tiny class keeps stream injection (handy for tests) and concurrency control in one place, instead of leaking through free functions. + + ``keep_history`` defaults to ``False`` so every ``/act`` callback wipes + the terminal first; pass ``True`` to retain previous output (e.g. for + log-style debugging or when the terminal does not support ANSI codes). """ def __init__( self, input_stream: IO[str] | None = None, output_stream: IO[str] | None = None, + keep_history: bool = False, ) -> None: self._input = input_stream if input_stream is not None else sys.stdin self._output = output_stream if output_stream is not None else sys.stdout - # The lock guards both the printed observation block and the prompt - # loop so two concurrent /act calls would never interleave on the - # same TTY. + # The lock guards every ``decide`` and ``announce_game`` call so two + # concurrent server callbacks never interleave on the same TTY. self._lock = Lock() + self._keep_history = keep_history def decide(self, observation: dict[str, Any]) -> dict[str, Any]: """Render an observation and return the operator's action dict.""" with self._lock: + # Clear-by-default keeps the focus on the current decision; only + # opt-out callers see the entire history scrolling upwards. + if not self._keep_history: + clear_screen(self._write) self._write(render_observation(observation)) return prompt_action( list(observation.get("legal_actions") or []), @@ -73,6 +91,16 @@ class HumanClientConsole: self._write, ) + def announce_game(self, game_state: dict[str, Any]) -> None: + """Render an end-of-hand game snapshot to the operator's terminal. + + Separated from :meth:`decide` because it is purely informational and + must never block on input; it just writes a digest under the same + lock to avoid corrupting an in-progress prompt. + """ + with self._lock: + self._write(render_game_state(game_state)) + def _write(self, text: str) -> None: self._output.write(text) self._output.flush() @@ -88,14 +116,17 @@ class HumanClientConsole: class HumanRequestHandler(BaseHTTPRequestHandler): """HTTP entry point for the standalone human agent. - Only ``POST /act`` is meaningful; ``GET /health`` is provided so deploys - can quickly probe whether the client is alive before hooking it up. + Routes: + + * ``GET /health`` - liveness probe. + * ``POST /act`` - decision request (blocks on stdin). + * ``POST /game`` - end-of-hand snapshot (non-blocking). """ - server_version = "TexasHoldemHumanClient/0.1" + server_version = "TexasHoldemHumanClient/0.2" - # Injected by :func:`create_server` on the underlying server instance so - # every handler shares the same terminal console. + # Injected by :func:`create_server` on the underlying server class so + # every handler shares the same terminal console (and lock). console: HumanClientConsole # type: ignore[assignment] def do_GET(self) -> None: @@ -105,7 +136,14 @@ class HumanRequestHandler(BaseHTTPRequestHandler): self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) def do_POST(self) -> None: - if self.path != "/act": + # Dispatch table keeps add/remove of routes mechanical and avoids + # the deeply-nested if/elif ladder common in BaseHTTPRequestHandler. + routes = { + "/act": self._handle_act, + "/game": self._handle_game, + } + handler = routes.get(self.path) + if handler is None: self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) return @@ -116,18 +154,24 @@ class HumanRequestHandler(BaseHTTPRequestHandler): return try: - action = self.console.decide(payload) + handler(payload) except EOFError as exc: # The operator closed stdin (Ctrl-D); surface as 503 so the # server can fall back to its default coercion (fold). self._json({"error": str(exc)}, HTTPStatus.SERVICE_UNAVAILABLE) - return except Exception as exc: # pragma: no cover - defensive guard self._json({"error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR) - return + def _handle_act(self, payload: dict[str, Any]) -> None: + action = self.console.decide(payload) self._json(action) + def _handle_game(self, payload: dict[str, Any]) -> None: + # The /game callback is informational; reply with an empty 204 so + # the calling engine knows we received it but does not parse a body. + self.console.announce_game(payload) + self._empty(HTTPStatus.NO_CONTENT) + # Silence the default access log so it does not interleave with prompts. def log_message(self, format: str, *args: Any) -> None: # noqa: A002 return @@ -156,6 +200,12 @@ class HumanRequestHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(body) + def _empty(self, status: HTTPStatus) -> None: + """Write a header-only response (used for ``204 No Content``).""" + self.send_response(status) + self.send_header("Content-Length", "0") + self.end_headers() + def create_server( host: str, @@ -174,16 +224,34 @@ def create_server( def main() -> None: parser = argparse.ArgumentParser( - description="Run an interactive HTTP Human Agent that exposes POST /act.", + description=( + "Run an interactive HTTP Human Agent that exposes " + "POST /act and POST /game." + ), ) parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", default=9001, type=int) + # Default behaviour clears the terminal on every /act so the operator + # always sees a fresh view. Opt-in flag restores the historical + # "append forever" behaviour for log-style debugging. + parser.add_argument( + "--keep-history", + action="store_true", + help=( + "Keep previous terminal output when a new /act request arrives " + "instead of clearing the screen." + ), + ) args = parser.parse_args() - server = create_server(args.host, args.port) + console = HumanClientConsole(keep_history=args.keep_history) + server = create_server(args.host, args.port, console=console) print( - f"Human HTTP agent listening on http://{args.host}:{args.port}/act\n" - "Use this URL as the 'endpoint' field of a 'http' agent spec.", + f"Human HTTP agent listening on http://{args.host}:{args.port}\n" + f" POST /act - decision request\n" + f" POST /game - end-of-hand snapshot\n" + f" clear-screen: {'off (keep history)' if args.keep_history else 'on'}\n" + "Pass the base URL above as the 'endpoint' field of an 'http' agent spec.", file=sys.stderr, flush=True, ) diff --git a/texas_holdem/human_io.py b/texas_holdem/human_io.py index 2aca282..3b5afde 100644 --- a/texas_holdem/human_io.py +++ b/texas_holdem/human_io.py @@ -193,7 +193,112 @@ def _emit_turn_separator(writer: Writer) -> None: Centralised so the exact glyph/length of the separator can be changed in one place if the visual style ever needs tweaking. """ - writer("=====\n\n") + line = "~" * 60 + writer(line + "\n\n") + + +# ANSI control sequence: ``ESC[2J`` clears the entire screen and ``ESC[H`` +# moves the cursor back to the top-left. Kept as a module constant so any +# caller can reuse the exact same sequence and tests can monkey-patch it. +CLEAR_SCREEN_SEQUENCE = "\x1b[2J\x1b[H" + + +def clear_screen(writer: Writer) -> None: + """Wipe the terminal via ANSI control sequences. + + Implemented as a tiny helper rather than each caller inlining the escape + code so we have a single location to swap in alternative strategies + (e.g. printing many newlines on terminals that ignore ANSI). + """ + writer(CLEAR_SCREEN_SEQUENCE) + + +def render_game_state(game_state: dict[str, Any]) -> str: + """Render a full ``GameManager.to_dict()`` snapshot for terminal display. + + The resulting block is intended for the standalone HTTP human client's + ``POST /game`` callback so the operator sees the up-to-date table state + plus a per-hand digest (winners, awards, showdown hole cards). + """ + lines: list[str] = [] + lines.append("#" * 60) + lines.append( + f"GAME UPDATE game_id={game_state.get('game_id')} " + f"status={game_state.get('status')} hand={game_state.get('hand_number')}" + ) + lines.append( + f"Blinds {game_state.get('small_blind')}/{game_state.get('big_blind')} " + f"| Button seat: {game_state.get('button_seat')} " + f"| Starting stack: {game_state.get('starting_stack')}" + ) + + lines.append("-" * 60) + lines.append("Stacks:") + for player in game_state.get("players", []): + flags = _player_flags(player) + lines.append( + f" seat {int(player.get('seat', 0)):>2} " + f"| {str(player.get('name', '')):<16} " + f"| stack {int(player.get('stack', 0)):>6} " + f"| {flags}" + ) + + hands = game_state.get("hands") or [] + lines.append("-" * 60) + lines.append(f"Hands played: {len(hands)}") + for hand in hands: + lines.extend(_render_hand_digest(hand)) + + lines.append("#" * 60) + return "\n".join(lines) + "\n" + + +def _player_flags(player: dict[str, Any]) -> str: + """Render the boolean state of a player as a compact tag list.""" + tags: list[str] = [] + if player.get("folded"): + tags.append("folded") + if player.get("all_in"): + tags.append("all_in") + if not player.get("in_hand"): + tags.append("out") + return ",".join(tags) if tags else "active" + + +def _render_hand_digest(hand: dict[str, Any]) -> list[str]: + """Render a single hand summary as a compact, multi-line digest. + + Kept separate from :func:`render_game_state` so the per-hand format can + be reused or extended (e.g. detailed action log) without entangling + with the table-level header layout. + """ + lines: list[str] = [] + lines.append( + f" Hand #{hand.get('hand_number')} " + f"| button_seat={hand.get('button_seat')} " + f"| board: {_format_cards(hand.get('board') or [], '(folded out)')}" + ) + + awards = hand.get("awards") or [] + if not awards: + lines.append(" (no awards recorded)") + for award in awards: + winners = ", ".join(str(w) for w in award.get("winners") or []) + hand_value = award.get("hand_value") or {} + value_label = hand_value.get("name") or "-" + lines.append( + f" pot {int(award.get('amount', 0)):>6} -> " + f"{winners or '(no winner)'} ({value_label})" + ) + + showdown = hand.get("showdown_hands") or {} + if showdown: + lines.append(" showdown:") + for player_id, cards in showdown.items(): + lines.append( + f" {player_id}: {_format_cards(cards, '(empty)')}" + ) + return lines def format_legal_action(action: dict[str, Any]) -> str: diff --git a/texas_holdem/models.py b/texas_holdem/models.py index a51bae7..49bad67 100644 --- a/texas_holdem/models.py +++ b/texas_holdem/models.py @@ -152,6 +152,7 @@ class HandSummary: board: list[Card] actions: list[ActionRecord] awards: list[PotAward] + showdown_hands: dict[str, list[Card]] = field(default_factory=dict) started_at: float = field(default_factory=time) finished_at: float = field(default_factory=time) @@ -163,6 +164,13 @@ class HandSummary: "board": [str(card) for card in self.board], "actions": [record.to_dict() for record in self.actions], "awards": [award.to_dict() for award in self.awards], + # ``showdown_hands`` is only populated when more than one player + # remained eligible for a pot; empty dict means the hand ended + # without a showdown (e.g. everyone folded but the winner). + "showdown_hands": { + player_id: [str(card) for card in cards] + for player_id, cards in self.showdown_hands.items() + }, "started_at": self.started_at, "finished_at": self.finished_at, }