281 lines
10 KiB
Python
281 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from random import Random
|
|
from typing import IO, Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
|
|
from texas_holdem.models import Observation, PlayerAction
|
|
|
|
|
|
class PokerAgent(ABC):
|
|
@abstractmethod
|
|
def decide(self, observation: Observation) -> PlayerAction:
|
|
raise NotImplementedError
|
|
|
|
def on_game_update(self, game_state: dict[str, Any]) -> None:
|
|
"""Optional hook called after every finished hand.
|
|
|
|
``game_state`` is the same dict produced by :meth:`TableGame.to_dict`,
|
|
carrying the full ``hands`` history (including any showdown hole
|
|
cards). Default implementation is a no-op so most agents (random,
|
|
calling, in-process humans) need not care about it.
|
|
"""
|
|
return None
|
|
|
|
|
|
class RandomAgent(PokerAgent):
|
|
def __init__(self, rng: Random | None = None) -> None:
|
|
self._rng = rng or Random()
|
|
|
|
def decide(self, observation: Observation) -> PlayerAction:
|
|
legal = observation.legal_actions
|
|
choice = self._rng.choice(legal)
|
|
action_type = str(choice["action"])
|
|
if action_type in {"bet", "raise"}:
|
|
min_amount = int(choice["min_amount"])
|
|
max_amount = int(choice["max_amount"])
|
|
return PlayerAction(action_type, self._rng.randint(min_amount, max_amount))
|
|
return PlayerAction(action_type, int(choice.get("amount") or 0))
|
|
|
|
|
|
class CallingStationAgent(PokerAgent):
|
|
def decide(self, observation: Observation) -> PlayerAction:
|
|
for action in observation.legal_actions:
|
|
if action["action"] == "check":
|
|
return PlayerAction("check")
|
|
for action in observation.legal_actions:
|
|
if action["action"] == "call":
|
|
return PlayerAction("call", int(action.get("amount") or 0))
|
|
return PlayerAction("fold")
|
|
|
|
|
|
def normalise_http_agent_endpoint(raw: str) -> str:
|
|
"""Return the canonical base URL for an HTTP agent endpoint."""
|
|
url = raw.rstrip("/")
|
|
if url.endswith("/act"):
|
|
url = url[: -len("/act")]
|
|
if url.endswith("/game"):
|
|
url = url[: -len("/game")]
|
|
return url
|
|
|
|
|
|
def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None:
|
|
"""Extract the canonical HTTP endpoint from an agent spec, if present."""
|
|
agent_type = str(spec.get("type", "calling")).lower()
|
|
if agent_type != "http":
|
|
return None
|
|
endpoint = spec.get("endpoint")
|
|
if not endpoint:
|
|
raise ValueError("http agent requires an endpoint")
|
|
return normalise_http_agent_endpoint(str(endpoint))
|
|
|
|
|
|
class HttpAgent(PokerAgent):
|
|
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
|
|
|
|
The constructor takes a *base* URL (e.g. ``http://host:9001``) and
|
|
derives the per-purpose endpoints internally. This keeps the wire layout
|
|
a server-side concern and lets us evolve the protocol (add ``/init``,
|
|
``/end``, ...) without touching every game spec.
|
|
"""
|
|
|
|
ACT_PATH = "/act"
|
|
GAME_PATH = "/game"
|
|
|
|
def __init__(
|
|
self,
|
|
endpoint: str,
|
|
timeout_seconds: float = 10.0,
|
|
player_id: str | None = None,
|
|
game_update_timeout_seconds: float | None = None,
|
|
retries: int = 2,
|
|
retry_backoff_seconds: float = 0.25,
|
|
) -> None:
|
|
self.base_url = normalise_http_agent_endpoint(endpoint)
|
|
self.timeout_seconds = timeout_seconds
|
|
self.player_id = player_id
|
|
self.game_update_timeout_seconds = (
|
|
float(game_update_timeout_seconds)
|
|
if game_update_timeout_seconds is not None
|
|
else min(timeout_seconds, 3.0)
|
|
)
|
|
self.retries = max(0, retries)
|
|
self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
|
|
|
|
def _url(self, path: str) -> str:
|
|
"""Compose a full URL by joining the base with a path component."""
|
|
return f"{self.base_url}{path}"
|
|
|
|
def decide(self, observation: Observation) -> PlayerAction:
|
|
payload = self._post_json(
|
|
self.ACT_PATH,
|
|
observation.to_dict(),
|
|
timeout_seconds=self.timeout_seconds,
|
|
)
|
|
if not isinstance(payload, dict):
|
|
raise RuntimeError("agent endpoint must return a JSON object")
|
|
return PlayerAction.from_dict(payload)
|
|
|
|
def on_game_update(self, game_state: dict[str, Any]) -> None:
|
|
"""Push the post-hand snapshot to the remote ``/game`` endpoint.
|
|
|
|
We swallow failures (the engine's broadcaster also catches them) so
|
|
a single offline client cannot stall the table; failure is logged
|
|
only by way of the raised exception bubbling to the engine guard.
|
|
"""
|
|
try:
|
|
self._post_json(
|
|
self.GAME_PATH,
|
|
game_state,
|
|
timeout_seconds=self.game_update_timeout_seconds,
|
|
)
|
|
except RuntimeError:
|
|
# ``/game`` is informational; treat any HTTP error as a benign
|
|
# drop rather than reraising and aborting the hand loop.
|
|
return None
|
|
|
|
def _post_json(
|
|
self,
|
|
path: str,
|
|
payload: dict[str, Any],
|
|
timeout_seconds: float,
|
|
) -> Any:
|
|
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
|
|
|
|
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
|
|
identical transport semantics (timeout, error wrapping, content-type).
|
|
"""
|
|
body = json.dumps(payload).encode("utf-8")
|
|
last_error: BaseException | None = None
|
|
raw = ""
|
|
for attempt in range(self.retries + 1):
|
|
request = Request(
|
|
self._url(path),
|
|
data=body,
|
|
headers=self._headers(),
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urlopen(request, timeout=timeout_seconds) as response:
|
|
raw = response.read().decode("utf-8")
|
|
break
|
|
except HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
last_error = RuntimeError(
|
|
f"agent endpoint failed with HTTP {exc.code}: "
|
|
f"{self._url(path)} {detail}"
|
|
)
|
|
except (OSError, URLError) as exc:
|
|
last_error = exc
|
|
if attempt < self.retries and self.retry_backoff_seconds > 0:
|
|
time.sleep(self.retry_backoff_seconds * (2**attempt))
|
|
else:
|
|
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error
|
|
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(
|
|
f"agent endpoint returned invalid JSON: {self._url(path)}"
|
|
) from exc
|
|
|
|
def _headers(self) -> dict[str, str]:
|
|
headers = {"Content-Type": "application/json", "Connection": "close"}
|
|
if self.player_id:
|
|
headers["X-Player-Id"] = self.player_id
|
|
return headers
|
|
|
|
|
|
class HumanAgent(PokerAgent):
|
|
"""Interactive CLI agent for debugging and manual play.
|
|
|
|
The agent renders the current observation in a human-friendly layout and
|
|
drives an interactive menu so the operator can only emit legal actions.
|
|
Streams are injected to keep the agent testable and to allow alternate
|
|
consoles in the future (e.g. piping to a debug log).
|
|
|
|
By default the terminal is wiped at the start of each ``decide`` call so
|
|
every turn appears on a clean screen. Set ``keep_history=True`` to
|
|
preserve the scrollback (useful when piping to a log file).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
input_stream: IO[str] | None = None,
|
|
output_stream: IO[str] | None = None,
|
|
keep_history: bool = False,
|
|
) -> None:
|
|
self._input = input_stream if input_stream is not None else sys.stdin
|
|
self._output = output_stream if output_stream is not None else sys.stdout
|
|
self._keep_history = keep_history
|
|
|
|
def decide(self, observation: Observation) -> PlayerAction:
|
|
# Convert to dict-form so the rendering/prompting code path is shared
|
|
# with the standalone HTTP human client (see texas_holdem.human_io).
|
|
if not self._keep_history:
|
|
clear_screen(self._write)
|
|
obs_dict = observation.to_dict()
|
|
self._write(render_observation(obs_dict))
|
|
chosen = prompt_action(
|
|
list(obs_dict.get("legal_actions") or []),
|
|
self._read_line,
|
|
self._write,
|
|
)
|
|
return PlayerAction.from_dict(chosen)
|
|
|
|
def _write(self, text: str) -> None:
|
|
"""Write to the configured output stream and flush eagerly."""
|
|
self._output.write(text)
|
|
self._output.flush()
|
|
|
|
def _read_line(self, prompt: str) -> str:
|
|
"""Display a prompt and read one line from the configured input.
|
|
|
|
We avoid builtin ``input()`` to honour the injected streams, which
|
|
also makes the agent unit-testable with StringIO.
|
|
"""
|
|
self._write(prompt)
|
|
line = self._input.readline()
|
|
if line == "":
|
|
raise EOFError("input stream closed while waiting for human action")
|
|
return line.rstrip("\n")
|
|
|
|
|
|
def build_agent(
|
|
spec: dict[str, Any],
|
|
rng: Random | None = None,
|
|
player_id: str | None = None,
|
|
) -> PokerAgent:
|
|
agent_type = str(spec.get("type", "calling")).lower()
|
|
if agent_type == "random":
|
|
return RandomAgent(rng)
|
|
if agent_type in {"calling", "call", "calling_station"}:
|
|
return CallingStationAgent()
|
|
if agent_type == "http":
|
|
endpoint = spec.get("endpoint")
|
|
if not endpoint:
|
|
raise ValueError("http agent requires an endpoint")
|
|
return HttpAgent(
|
|
str(endpoint),
|
|
timeout_seconds=float(spec.get("timeout_seconds", 10.0)),
|
|
player_id=player_id,
|
|
game_update_timeout_seconds=(
|
|
float(spec["game_update_timeout_seconds"])
|
|
if "game_update_timeout_seconds" in spec
|
|
else None
|
|
),
|
|
retries=int(spec.get("retries", 2)),
|
|
retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)),
|
|
)
|
|
if agent_type in {"human", "cli", "interactive"}:
|
|
return HumanAgent()
|
|
raise ValueError(f"unknown agent type: {agent_type}")
|