Files

281 lines
10 KiB
Python

from __future__ import annotations
import json
import sys
import time
from abc import ABC, abstractmethod
from random import Random
from typing import IO, Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
from texas_holdem.models import Observation, PlayerAction
class PokerAgent(ABC):
@abstractmethod
def decide(self, observation: Observation) -> PlayerAction:
raise NotImplementedError
def on_game_update(self, game_state: dict[str, Any]) -> None:
"""Optional hook called after every finished hand.
``game_state`` is the same dict produced by :meth:`TableGame.to_dict`,
carrying the full ``hands`` history (including any showdown hole
cards). Default implementation is a no-op so most agents (random,
calling, in-process humans) need not care about it.
"""
return None
class RandomAgent(PokerAgent):
def __init__(self, rng: Random | None = None) -> None:
self._rng = rng or Random()
def decide(self, observation: Observation) -> PlayerAction:
legal = observation.legal_actions
choice = self._rng.choice(legal)
action_type = str(choice["action"])
if action_type in {"bet", "raise"}:
min_amount = int(choice["min_amount"])
max_amount = int(choice["max_amount"])
return PlayerAction(action_type, self._rng.randint(min_amount, max_amount))
return PlayerAction(action_type, int(choice.get("amount") or 0))
class CallingStationAgent(PokerAgent):
def decide(self, observation: Observation) -> PlayerAction:
for action in observation.legal_actions:
if action["action"] == "check":
return PlayerAction("check")
for action in observation.legal_actions:
if action["action"] == "call":
return PlayerAction("call", int(action.get("amount") or 0))
return PlayerAction("fold")
def normalise_http_agent_endpoint(raw: str) -> str:
"""Return the canonical base URL for an HTTP agent endpoint."""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
if url.endswith("/game"):
url = url[: -len("/game")]
return url
def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None:
"""Extract the canonical HTTP endpoint from an agent spec, if present."""
agent_type = str(spec.get("type", "calling")).lower()
if agent_type != "http":
return None
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return normalise_http_agent_endpoint(str(endpoint))
class HttpAgent(PokerAgent):
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
The constructor takes a *base* URL (e.g. ``http://host:9001``) and
derives the per-purpose endpoints internally. This keeps the wire layout
a server-side concern and lets us evolve the protocol (add ``/init``,
``/end``, ...) without touching every game spec.
"""
ACT_PATH = "/act"
GAME_PATH = "/game"
def __init__(
self,
endpoint: str,
timeout_seconds: float = 10.0,
player_id: str | None = None,
game_update_timeout_seconds: float | None = None,
retries: int = 2,
retry_backoff_seconds: float = 0.25,
) -> None:
self.base_url = normalise_http_agent_endpoint(endpoint)
self.timeout_seconds = timeout_seconds
self.player_id = player_id
self.game_update_timeout_seconds = (
float(game_update_timeout_seconds)
if game_update_timeout_seconds is not None
else min(timeout_seconds, 3.0)
)
self.retries = max(0, retries)
self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
def _url(self, path: str) -> str:
"""Compose a full URL by joining the base with a path component."""
return f"{self.base_url}{path}"
def decide(self, observation: Observation) -> PlayerAction:
payload = self._post_json(
self.ACT_PATH,
observation.to_dict(),
timeout_seconds=self.timeout_seconds,
)
if not isinstance(payload, dict):
raise RuntimeError("agent endpoint must return a JSON object")
return PlayerAction.from_dict(payload)
def on_game_update(self, game_state: dict[str, Any]) -> None:
"""Push the post-hand snapshot to the remote ``/game`` endpoint.
We swallow failures (the engine's broadcaster also catches them) so
a single offline client cannot stall the table; failure is logged
only by way of the raised exception bubbling to the engine guard.
"""
try:
self._post_json(
self.GAME_PATH,
game_state,
timeout_seconds=self.game_update_timeout_seconds,
)
except RuntimeError:
# ``/game`` is informational; treat any HTTP error as a benign
# drop rather than reraising and aborting the hand loop.
return None
def _post_json(
self,
path: str,
payload: dict[str, Any],
timeout_seconds: float,
) -> Any:
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
identical transport semantics (timeout, error wrapping, content-type).
"""
body = json.dumps(payload).encode("utf-8")
last_error: BaseException | None = None
raw = ""
for attempt in range(self.retries + 1):
request = Request(
self._url(path),
data=body,
headers=self._headers(),
method="POST",
)
try:
with urlopen(request, timeout=timeout_seconds) as response:
raw = response.read().decode("utf-8")
break
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
last_error = RuntimeError(
f"agent endpoint failed with HTTP {exc.code}: "
f"{self._url(path)} {detail}"
)
except (OSError, URLError) as exc:
last_error = exc
if attempt < self.retries and self.retry_backoff_seconds > 0:
time.sleep(self.retry_backoff_seconds * (2**attempt))
else:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"agent endpoint returned invalid JSON: {self._url(path)}"
) from exc
def _headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json", "Connection": "close"}
if self.player_id:
headers["X-Player-Id"] = self.player_id
return headers
class HumanAgent(PokerAgent):
"""Interactive CLI agent for debugging and manual play.
The agent renders the current observation in a human-friendly layout and
drives an interactive menu so the operator can only emit legal actions.
Streams are injected to keep the agent testable and to allow alternate
consoles in the future (e.g. piping to a debug log).
By default the terminal is wiped at the start of each ``decide`` call so
every turn appears on a clean screen. Set ``keep_history=True`` to
preserve the scrollback (useful when piping to a log file).
"""
def __init__(
self,
input_stream: IO[str] | None = None,
output_stream: IO[str] | None = None,
keep_history: bool = False,
) -> None:
self._input = input_stream if input_stream is not None else sys.stdin
self._output = output_stream if output_stream is not None else sys.stdout
self._keep_history = keep_history
def decide(self, observation: Observation) -> PlayerAction:
# Convert to dict-form so the rendering/prompting code path is shared
# with the standalone HTTP human client (see texas_holdem.human_io).
if not self._keep_history:
clear_screen(self._write)
obs_dict = observation.to_dict()
self._write(render_observation(obs_dict))
chosen = prompt_action(
list(obs_dict.get("legal_actions") or []),
self._read_line,
self._write,
)
return PlayerAction.from_dict(chosen)
def _write(self, text: str) -> None:
"""Write to the configured output stream and flush eagerly."""
self._output.write(text)
self._output.flush()
def _read_line(self, prompt: str) -> str:
"""Display a prompt and read one line from the configured input.
We avoid builtin ``input()`` to honour the injected streams, which
also makes the agent unit-testable with StringIO.
"""
self._write(prompt)
line = self._input.readline()
if line == "":
raise EOFError("input stream closed while waiting for human action")
return line.rstrip("\n")
def build_agent(
spec: dict[str, Any],
rng: Random | None = None,
player_id: str | None = None,
) -> PokerAgent:
agent_type = str(spec.get("type", "calling")).lower()
if agent_type == "random":
return RandomAgent(rng)
if agent_type in {"calling", "call", "calling_station"}:
return CallingStationAgent()
if agent_type == "http":
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return HttpAgent(
str(endpoint),
timeout_seconds=float(spec.get("timeout_seconds", 10.0)),
player_id=player_id,
game_update_timeout_seconds=(
float(spec["game_update_timeout_seconds"])
if "game_update_timeout_seconds" in spec
else None
),
retries=int(spec.get("retries", 2)),
retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)),
)
if agent_type in {"human", "cli", "interactive"}:
return HumanAgent()
raise ValueError(f"unknown agent type: {agent_type}")