Files
texas_hold_x/texas_holdem/agents.py
T
2026-05-11 20:23:28 +08:00

206 lines
8.0 KiB
Python

from __future__ import annotations
import json
import sys
from abc import ABC, abstractmethod
from random import Random
from typing import IO, Any
from urllib.error import URLError
from urllib.request import Request, urlopen
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
from texas_holdem.models import Observation, PlayerAction
class PokerAgent(ABC):
@abstractmethod
def decide(self, observation: Observation) -> PlayerAction:
raise NotImplementedError
def on_game_update(self, game_state: dict[str, Any]) -> None:
"""Optional hook called after every finished hand.
``game_state`` is the same dict produced by :meth:`TableGame.to_dict`,
carrying the full ``hands`` history (including any showdown hole
cards). Default implementation is a no-op so most agents (random,
calling, in-process humans) need not care about it.
"""
return None
class RandomAgent(PokerAgent):
def __init__(self, rng: Random | None = None) -> None:
self._rng = rng or Random()
def decide(self, observation: Observation) -> PlayerAction:
legal = observation.legal_actions
choice = self._rng.choice(legal)
action_type = str(choice["action"])
if action_type in {"bet", "raise"}:
min_amount = int(choice["min_amount"])
max_amount = int(choice["max_amount"])
return PlayerAction(action_type, self._rng.randint(min_amount, max_amount))
return PlayerAction(action_type, int(choice.get("amount") or 0))
class CallingStationAgent(PokerAgent):
def decide(self, observation: Observation) -> PlayerAction:
for action in observation.legal_actions:
if action["action"] == "check":
return PlayerAction("check")
for action in observation.legal_actions:
if action["action"] == "call":
return PlayerAction("call", int(action.get("amount") or 0))
return PlayerAction("fold")
class HttpAgent(PokerAgent):
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
The constructor takes a *base* URL (e.g. ``http://host:9001``) and
derives the per-purpose endpoints internally. This keeps the wire layout
a server-side concern and lets us evolve the protocol (add ``/init``,
``/end``, ...) without touching every game spec.
"""
ACT_PATH = "/act"
GAME_PATH = "/game"
def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None:
self.base_url = self._normalise_base_url(endpoint)
self.timeout_seconds = timeout_seconds
@staticmethod
def _normalise_base_url(raw: str) -> str:
"""Strip a trailing slash so URL joins do not produce double slashes.
Centralising this also tolerates the legacy "endpoint already points
at /act" mistake by chopping off a redundant ``/act`` suffix.
"""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
return url
def _url(self, path: str) -> str:
"""Compose a full URL by joining the base with a path component."""
return f"{self.base_url}{path}"
def decide(self, observation: Observation) -> PlayerAction:
payload = self._post_json(self.ACT_PATH, observation.to_dict())
if not isinstance(payload, dict):
raise RuntimeError("agent endpoint must return a JSON object")
return PlayerAction.from_dict(payload)
def on_game_update(self, game_state: dict[str, Any]) -> None:
"""Push the post-hand snapshot to the remote ``/game`` endpoint.
We swallow failures (the engine's broadcaster also catches them) so
a single offline client cannot stall the table; failure is logged
only by way of the raised exception bubbling to the engine guard.
"""
try:
self._post_json(self.GAME_PATH, game_state)
except RuntimeError:
# ``/game`` is informational; treat any HTTP error as a benign
# drop rather than reraising and aborting the hand loop.
return None
def _post_json(self, path: str, payload: dict[str, Any]) -> Any:
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
identical transport semantics (timeout, error wrapping, content-type).
"""
body = json.dumps(payload).encode("utf-8")
request = Request(
self._url(path),
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urlopen(request, timeout=self.timeout_seconds) as response:
raw = response.read().decode("utf-8")
except (OSError, URLError) as exc:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"agent endpoint returned invalid JSON: {self._url(path)}"
) from exc
class HumanAgent(PokerAgent):
"""Interactive CLI agent for debugging and manual play.
The agent renders the current observation in a human-friendly layout and
drives an interactive menu so the operator can only emit legal actions.
Streams are injected to keep the agent testable and to allow alternate
consoles in the future (e.g. piping to a debug log).
By default the terminal is wiped at the start of each ``decide`` call so
every turn appears on a clean screen. Set ``keep_history=True`` to
preserve the scrollback (useful when piping to a log file).
"""
def __init__(
self,
input_stream: IO[str] | None = None,
output_stream: IO[str] | None = None,
keep_history: bool = False,
) -> None:
self._input = input_stream if input_stream is not None else sys.stdin
self._output = output_stream if output_stream is not None else sys.stdout
self._keep_history = keep_history
def decide(self, observation: Observation) -> PlayerAction:
# Convert to dict-form so the rendering/prompting code path is shared
# with the standalone HTTP human client (see texas_holdem.human_io).
if not self._keep_history:
clear_screen(self._write)
obs_dict = observation.to_dict()
self._write(render_observation(obs_dict))
chosen = prompt_action(
list(obs_dict.get("legal_actions") or []),
self._read_line,
self._write,
)
return PlayerAction.from_dict(chosen)
def _write(self, text: str) -> None:
"""Write to the configured output stream and flush eagerly."""
self._output.write(text)
self._output.flush()
def _read_line(self, prompt: str) -> str:
"""Display a prompt and read one line from the configured input.
We avoid builtin ``input()`` to honour the injected streams, which
also makes the agent unit-testable with StringIO.
"""
self._write(prompt)
line = self._input.readline()
if line == "":
raise EOFError("input stream closed while waiting for human action")
return line.rstrip("\n")
def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
agent_type = str(spec.get("type", "calling")).lower()
if agent_type == "random":
return RandomAgent(rng)
if agent_type in {"calling", "call", "calling_station"}:
return CallingStationAgent()
if agent_type == "http":
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0)))
if agent_type in {"human", "cli", "interactive"}:
return HumanAgent()
raise ValueError(f"unknown agent type: {agent_type}")