from __future__ import annotations import json import sys import time from abc import ABC, abstractmethod from random import Random from typing import IO, Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from texas_holdem.human_io import clear_screen, prompt_action, render_observation from texas_holdem.models import Observation, PlayerAction class PokerAgent(ABC): @abstractmethod def decide(self, observation: Observation) -> PlayerAction: raise NotImplementedError def on_game_update(self, game_state: dict[str, Any]) -> None: """Optional hook called after every finished hand. ``game_state`` is the same dict produced by :meth:`TableGame.to_dict`, carrying the full ``hands`` history (including any showdown hole cards). Default implementation is a no-op so most agents (random, calling, in-process humans) need not care about it. """ return None class RandomAgent(PokerAgent): def __init__(self, rng: Random | None = None) -> None: self._rng = rng or Random() def decide(self, observation: Observation) -> PlayerAction: legal = observation.legal_actions choice = self._rng.choice(legal) action_type = str(choice["action"]) if action_type in {"bet", "raise"}: min_amount = int(choice["min_amount"]) max_amount = int(choice["max_amount"]) return PlayerAction(action_type, self._rng.randint(min_amount, max_amount)) return PlayerAction(action_type, int(choice.get("amount") or 0)) class CallingStationAgent(PokerAgent): def decide(self, observation: Observation) -> PlayerAction: for action in observation.legal_actions: if action["action"] == "check": return PlayerAction("check") for action in observation.legal_actions: if action["action"] == "call": return PlayerAction("call", int(action.get("amount") or 0)) return PlayerAction("fold") def normalise_http_agent_endpoint(raw: str) -> str: """Return the canonical base URL for an HTTP agent endpoint.""" url = raw.rstrip("/") if url.endswith("/act"): url = url[: -len("/act")] if url.endswith("/game"): url = url[: -len("/game")] return url def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None: """Extract the canonical HTTP endpoint from an agent spec, if present.""" agent_type = str(spec.get("type", "calling")).lower() if agent_type != "http": return None endpoint = spec.get("endpoint") if not endpoint: raise ValueError("http agent requires an endpoint") return normalise_http_agent_endpoint(str(endpoint)) class HttpAgent(PokerAgent): """Remote agent that talks to a base URL exposing ``/act`` and ``/game``. The constructor takes a *base* URL (e.g. ``http://host:9001``) and derives the per-purpose endpoints internally. This keeps the wire layout a server-side concern and lets us evolve the protocol (add ``/init``, ``/end``, ...) without touching every game spec. """ ACT_PATH = "/act" GAME_PATH = "/game" def __init__( self, endpoint: str, timeout_seconds: float = 10.0, player_id: str | None = None, game_update_timeout_seconds: float | None = None, retries: int = 2, retry_backoff_seconds: float = 0.25, ) -> None: self.base_url = normalise_http_agent_endpoint(endpoint) self.timeout_seconds = timeout_seconds self.player_id = player_id self.game_update_timeout_seconds = ( float(game_update_timeout_seconds) if game_update_timeout_seconds is not None else min(timeout_seconds, 3.0) ) self.retries = max(0, retries) self.retry_backoff_seconds = max(0.0, retry_backoff_seconds) def _url(self, path: str) -> str: """Compose a full URL by joining the base with a path component.""" return f"{self.base_url}{path}" def decide(self, observation: Observation) -> PlayerAction: payload = self._post_json( self.ACT_PATH, observation.to_dict(), timeout_seconds=self.timeout_seconds, ) if not isinstance(payload, dict): raise RuntimeError("agent endpoint must return a JSON object") return PlayerAction.from_dict(payload) def on_game_update(self, game_state: dict[str, Any]) -> None: """Push the post-hand snapshot to the remote ``/game`` endpoint. We swallow failures (the engine's broadcaster also catches them) so a single offline client cannot stall the table; failure is logged only by way of the raised exception bubbling to the engine guard. """ try: self._post_json( self.GAME_PATH, game_state, timeout_seconds=self.game_update_timeout_seconds, ) except RuntimeError: # ``/game`` is informational; treat any HTTP error as a benign # drop rather than reraising and aborting the hand loop. return None def _post_json( self, path: str, payload: dict[str, Any], timeout_seconds: float, ) -> Any: """POST ``payload`` as JSON to ``base_url + path`` and return parsed body. Extracted as a tiny helper so ``decide`` and ``on_game_update`` share identical transport semantics (timeout, error wrapping, content-type). """ body = json.dumps(payload).encode("utf-8") last_error: BaseException | None = None raw = "" for attempt in range(self.retries + 1): request = Request( self._url(path), data=body, headers=self._headers(), method="POST", ) try: with urlopen(request, timeout=timeout_seconds) as response: raw = response.read().decode("utf-8") break except HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") last_error = RuntimeError( f"agent endpoint failed with HTTP {exc.code}: " f"{self._url(path)} {detail}" ) except (OSError, URLError) as exc: last_error = exc if attempt < self.retries and self.retry_backoff_seconds > 0: time.sleep(self.retry_backoff_seconds * (2**attempt)) else: raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error if not raw: return None try: return json.loads(raw) except json.JSONDecodeError as exc: raise RuntimeError( f"agent endpoint returned invalid JSON: {self._url(path)}" ) from exc def _headers(self) -> dict[str, str]: headers = {"Content-Type": "application/json", "Connection": "close"} if self.player_id: headers["X-Player-Id"] = self.player_id return headers class HumanAgent(PokerAgent): """Interactive CLI agent for debugging and manual play. The agent renders the current observation in a human-friendly layout and drives an interactive menu so the operator can only emit legal actions. Streams are injected to keep the agent testable and to allow alternate consoles in the future (e.g. piping to a debug log). By default the terminal is wiped at the start of each ``decide`` call so every turn appears on a clean screen. Set ``keep_history=True`` to preserve the scrollback (useful when piping to a log file). """ def __init__( self, input_stream: IO[str] | None = None, output_stream: IO[str] | None = None, keep_history: bool = False, ) -> None: self._input = input_stream if input_stream is not None else sys.stdin self._output = output_stream if output_stream is not None else sys.stdout self._keep_history = keep_history def decide(self, observation: Observation) -> PlayerAction: # Convert to dict-form so the rendering/prompting code path is shared # with the standalone HTTP human client (see texas_holdem.human_io). if not self._keep_history: clear_screen(self._write) obs_dict = observation.to_dict() self._write(render_observation(obs_dict)) chosen = prompt_action( list(obs_dict.get("legal_actions") or []), self._read_line, self._write, ) return PlayerAction.from_dict(chosen) def _write(self, text: str) -> None: """Write to the configured output stream and flush eagerly.""" self._output.write(text) self._output.flush() def _read_line(self, prompt: str) -> str: """Display a prompt and read one line from the configured input. We avoid builtin ``input()`` to honour the injected streams, which also makes the agent unit-testable with StringIO. """ self._write(prompt) line = self._input.readline() if line == "": raise EOFError("input stream closed while waiting for human action") return line.rstrip("\n") def build_agent( spec: dict[str, Any], rng: Random | None = None, player_id: str | None = None, ) -> PokerAgent: agent_type = str(spec.get("type", "calling")).lower() if agent_type == "random": return RandomAgent(rng) if agent_type in {"calling", "call", "calling_station"}: return CallingStationAgent() if agent_type == "http": endpoint = spec.get("endpoint") if not endpoint: raise ValueError("http agent requires an endpoint") return HttpAgent( str(endpoint), timeout_seconds=float(spec.get("timeout_seconds", 10.0)), player_id=player_id, game_update_timeout_seconds=( float(spec["game_update_timeout_seconds"]) if "game_update_timeout_seconds" in spec else None ), retries=int(spec.get("retries", 2)), retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)), ) if agent_type in {"human", "cli", "interactive"}: return HumanAgent() raise ValueError(f"unknown agent type: {agent_type}")