8 Commits

Author SHA1 Message Date
mamamiyear c0bc5384f4 feat: add hand detail API and enrich hand summary fields
- HandSummary: add hole_cards, starting_stacks, ending_stacks, pot_contributions
- Engine: capture all players' hole cards (not just showdown), pre/post hand stacks, per-level pot contributions
- Server: new GET /game/<game_id>/hands/<hand_number> route
- Service: add get_hand_state() method
- Tests: add ServerTests for new endpoint, update existing tests
- Existing GET /game/<game_id> auto-inherits new fields via shared to_dict()
2026-05-23 22:11:45 +08:00
mamamiyear 5899ea0b89 Revert "feat: add replay server and web client"
This reverts commit 3c027eae0b.
2026-05-21 09:22:04 +08:00
mamamiyear 1ee963ce2e chore: add .codex to .gitignore 2026-05-17 11:23:21 +08:00
mamamiyear 351cac7734 docs: add AGENTS.md 2026-05-15 14:58:54 +08:00
mamamiyear 79dccde963 fix: game service api block when a game is running 2026-05-13 21:42:53 +08:00
qianrui.mmmy 3c027eae0b feat: add replay server and web client 2026-05-13 17:35:46 +08:00
qianrui.mmmy 09c42e9fa3 feat: set blind bet by run hands 2026-05-13 14:30:51 +08:00
qianrui.mmmy e22586aa2f feat: add --hide-reasoning for ai agent to hide reasoning info 2026-05-12 20:42:38 +08:00
13 changed files with 827 additions and 77 deletions
+7
View File
@@ -32,5 +32,12 @@ htmlcov/
# IDE and editor files
.idea/
.vscode/
.codex/
*.swp
*.swo
# debug resources
debug/
# Node dependencies for browser automation tooling
node_modules/
+32
View File
@@ -0,0 +1,32 @@
# Repository Guidelines
## Project Structure & Module Organization
Core poker service code lives in `texas_holdem/`. Important modules include `engine.py` for Texas Hold'em rules, `service.py` for game management, `server.py` for the HTTP API, `agents.py` for local/HTTP agents, and `ai_client.py` / `human_client.py` for standalone agents. Prompt templates live in `texas_holdem/prompts/`.
Replay UI code lives in `texas_holdem_replay/`, with static browser assets under `texas_holdem_replay/static/`. Tests are in `tests/`, named by feature area such as `test_engine.py`, `test_service.py`, and `test_replay_server.py`. Design notes belong in `docs/`.
## Build, Test, and Development Commands
- `python -m unittest discover -v` runs the full test suite.
- `python -m compileall texas_holdem texas_holdem_replay tests` checks import and syntax validity.
- `python -m texas_holdem.server --host 127.0.0.1 --port 8000` starts the game service.
- `python -m texas_holdem.human_client --port 9001 --keep-history` starts an interactive human HTTP agent.
- `python -m texas_holdem.ai_client --port 9101 --api-key "$OPENAI_API_KEY" --model gpt-4o-mini` starts an OpenAI-compatible AI agent.
- `python -m texas_holdem_replay.server --port 8088` starts the replay viewer.
## Coding Style & Naming Conventions
Use Python 3.11+ standard-library APIs unless a dependency is intentionally added to `pyproject.toml`. Keep modules focused and prefer explicit dataclasses for wire/state models. Use 4-space indentation, type hints, `snake_case` for functions and variables, `PascalCase` for classes, and concise comments only where logic is non-obvious.
## Testing Guidelines
Use `unittest`. Add tests near the behavior changed: engine rules in `tests/test_engine.py`, HTTP/service behavior in `tests/test_service.py`, agent transport in `tests/test_agents.py`, and replay UI server helpers in `tests/test_replay_server.py`. New bug fixes should include a regression test. Avoid tests that require external network access or real LLM calls.
## Commit & Pull Request Guidelines
History uses short Conventional Commit-style subjects, for example `feat: add replay server and web client` and `fix: game service api block when a game is running`. Keep commits scoped to one behavior change. Pull requests should include a short summary, test commands run, linked issue or motivation, and screenshots only for replay UI or visible terminal-output changes.
## Security & Configuration Tips
Do not commit API keys. Pass LLM credentials through `OPENAI_API_KEY` or CLI flags in local shells only. HTTP Agent endpoints are exclusive per active game; preserve this invariant when changing service concurrency.
+16 -2
View File
@@ -11,6 +11,7 @@
- 支持盲注、四条街下注、弃牌、过牌、跟注、下注、加注、全下、边池和摊牌结算。
- 支持本地 Agent 和 HTTP Agent。
- 支持 Human Agent 和 OpenAI-compatible AI Agent 的终端过程输出。
- 游戏运行中可以并发查询状态;查询返回上一手完成后的稳定快照。
## 运行服务
@@ -51,6 +52,12 @@ curl -X POST http://127.0.0.1:8000/games/demo/hands/run \
curl http://127.0.0.1:8000/games/demo
```
也可以使用单数别名:
```bash
curl http://127.0.0.1:8000/game/demo
```
## HTTP Agent 协议
玩家配置可以使用远程 HTTP Agent:
@@ -62,12 +69,19 @@ curl http://127.0.0.1:8000/games/demo
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101",
"timeout_seconds": 10
"timeout_seconds": 10,
"game_update_timeout_seconds": 3,
"retries": 2,
"retry_backoff_seconds": 0.25
}
}
```
服务会向 `endpoint` 发送当前行动玩家的观察 JSON。Agent 返回:
服务会向 `endpoint + /game` 推送每手开始时的游戏快照,向 `endpoint + /act` 发送当前行动玩家的观察 JSON。`endpoint` 也可以传入历史形式的 `/act``/game` 后缀,服务会归一化为 base URL。
同一个 HTTP Agent endpoint 不能同时被不同游戏占用;后创建的游戏会返回错误。服务会给 HTTP Agent 请求自动重试,`/act` 重试仍失败时,规则引擎会按 `check > call > fold` 选择默认动作,避免整桌卡死。
Agent 返回:
```json
{"action": "call"}
+60
View File
@@ -0,0 +1,60 @@
import json
import unittest
from unittest.mock import patch
from urllib.error import URLError
from texas_holdem.agents import HttpAgent, normalise_http_agent_endpoint
class FakeResponse:
def __init__(self, payload: dict[str, object]) -> None:
self.payload = payload
def read(self) -> bytes:
return json.dumps(self.payload).encode("utf-8")
def __enter__(self) -> "FakeResponse":
return self
def __exit__(self, *args: object) -> None:
return None
class AgentTests(unittest.TestCase):
def test_normalise_http_agent_endpoint_accepts_action_or_game_paths(self) -> None:
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/act"),
"http://127.0.0.1:9101",
)
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/game/"),
"http://127.0.0.1:9101",
)
def test_http_agent_post_retries_and_sets_player_header(self) -> None:
calls = []
def fake_urlopen(request, timeout): # type: ignore[no-untyped-def]
calls.append((request, timeout))
if len(calls) == 1:
raise URLError("temporary")
return FakeResponse({"ok": True})
agent = HttpAgent(
"http://agent.test/act",
player_id="p1",
retries=1,
retry_backoff_seconds=0,
)
with patch("texas_holdem.agents.urlopen", fake_urlopen):
payload = agent._post_json("/game", {"game_id": "g1"}, timeout_seconds=2)
self.assertEqual(payload, {"ok": True})
self.assertEqual(len(calls), 2)
self.assertEqual(calls[1][0].headers["X-player-id"], "p1")
self.assertEqual(calls[1][1], 2)
if __name__ == "__main__":
unittest.main()
+40
View File
@@ -100,10 +100,50 @@ class EngineTests(unittest.TestCase):
awards = game._award_pots()
self.assertEqual([award.amount for award in awards], [150, 100])
self.assertEqual(
[contribution["amount"] for contribution in game._last_pot_contributions],
[150, 100],
)
self.assertEqual(
game._last_pot_contributions[0]["contributors"],
{"p1": 50, "p2": 50, "p3": 50},
)
self.assertEqual(
game._last_pot_contributions[1]["contributors"],
{"p2": 50, "p3": 50},
)
self.assertEqual(game.players[0].stack, 150)
self.assertEqual(game.players[1].stack, 100)
self.assertEqual(game.players[2].stack, 0)
def test_hand_summary_includes_full_hand_snapshots(self) -> None:
players = [
("p1", "Player 1", CallingStationAgent()),
("p2", "Player 2", CallingStationAgent()),
("p3", "Player 3", CallingStationAgent()),
]
game = TableGame("g5", players, starting_stack=100, small_blind=5, big_blind=10, rng=Random(23))
summary = game.run_hand()
payload = summary.to_dict()
self.assertEqual(set(summary.hole_cards), {"p1", "p2", "p3"})
self.assertTrue(all(len(cards) == 2 for cards in summary.hole_cards.values()))
self.assertEqual(summary.starting_stacks, {"p1": 100, "p2": 100, "p3": 100})
self.assertEqual(set(summary.ending_stacks), {"p1", "p2", "p3"})
self.assertEqual(sum(summary.starting_stacks.values()), sum(summary.ending_stacks.values()))
self.assertGreaterEqual(len(summary.pot_contributions), 1)
self.assertTrue(
all(
contribution["amount"] == sum(contribution["contributors"].values())
for contribution in summary.pot_contributions
)
)
self.assertEqual(set(payload["hole_cards"]), {"p1", "p2", "p3"})
self.assertEqual(payload["starting_stacks"], {"p1": 100, "p2": 100, "p3": 100})
self.assertIn("ending_stacks", payload)
self.assertIn("pot_contributions", payload)
def test_short_all_in_does_not_reopen_raising_to_prior_actor(self) -> None:
seen: list[tuple[str, str, list[str]]] = []
players = [
+76
View File
@@ -0,0 +1,76 @@
import json
import unittest
from threading import Thread
from urllib.request import Request, urlopen
from texas_holdem import server as poker_server
from texas_holdem.service import GameManager
class ServerTests(unittest.TestCase):
def setUp(self) -> None:
self.previous_manager = poker_server.MANAGER
poker_server.MANAGER = GameManager()
self.server = poker_server.create_server("127.0.0.1", 0)
self.thread = Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
host, port = self.server.server_address
self.base_url = f"http://{host}:{port}"
def tearDown(self) -> None:
self.server.shutdown()
self.server.server_close()
self.thread.join(timeout=2)
poker_server.MANAGER = self.previous_manager
def request_json(
self,
method: str,
path: str,
payload: dict[str, object] | None = None,
) -> dict[str, object]:
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = Request(
f"{self.base_url}{path}",
data=data,
headers=headers,
method=method,
)
with urlopen(request, timeout=5) as response:
return json.loads(response.read().decode("utf-8"))
def test_get_hand_route_returns_expanded_hand_summary(self) -> None:
self.request_json(
"POST",
"/game",
{
"game_id": "route-demo",
"seed": 17,
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{"id": "a", "type": "calling"},
{"id": "b", "type": "calling"},
],
},
)
self.request_json("POST", "/game/route-demo/hands", {"count": 1})
hand = self.request_json("GET", "/game/route-demo/hands/1")
game = self.request_json("GET", "/game/route-demo")
self.assertEqual(hand["hand_number"], 1)
self.assertEqual(set(hand["hole_cards"]), {"a", "b"})
self.assertEqual(hand["starting_stacks"], {"a": 200, "b": 200})
self.assertIn("ending_stacks", hand)
self.assertIn("pot_contributions", hand)
self.assertEqual(game["hands"][0], hand)
if __name__ == "__main__":
unittest.main()
+82 -1
View File
@@ -1,8 +1,26 @@
import unittest
from threading import Event, Thread
from texas_holdem.agents import PokerAgent
from texas_holdem.models import Observation, PlayerAction
from texas_holdem.service import GameManager
class BlockingAgent(PokerAgent):
def __init__(self, entered: Event, release: Event) -> None:
self.entered = entered
self.release = release
def decide(self, observation: Observation) -> PlayerAction:
self.entered.set()
if not self.release.wait(timeout=5):
raise RuntimeError("test timed out waiting to release blocking agent")
for action in observation.legal_actions:
if action["action"] == "check":
return PlayerAction("check")
return PlayerAction("call")
class ServiceTests(unittest.TestCase):
def test_create_and_run_game(self) -> None:
manager = GameManager()
@@ -22,8 +40,71 @@ class ServiceTests(unittest.TestCase):
hands = manager.run_hands(game.game_id, count=1)
state = manager.get_game_state("demo")
hand = manager.get_hand_state("demo", 1)
self.assertEqual(len(hands), 1)
self.assertEqual(manager.get_game("demo").to_dict()["hand_number"], 1)
self.assertEqual(state["hand_number"], 1)
self.assertEqual(hand, state["hands"][0])
self.assertIn("hole_cards", hand)
self.assertIn("starting_stacks", hand)
self.assertIn("ending_stacks", hand)
self.assertIn("pot_contributions", hand)
def test_get_game_state_does_not_block_during_run(self) -> None:
manager = GameManager()
entered = Event()
release = Event()
game = manager.create_game(
{
"game_id": "blocking",
"seed": 13,
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{"id": "a", "type": "calling"},
{"id": "b", "type": "calling"},
],
}
)
manager.run_hands("blocking", count=1)
game.agents["a"] = BlockingAgent(entered, release)
thread = Thread(target=lambda: manager.run_hands("blocking", count=1))
thread.start()
self.assertTrue(entered.wait(timeout=2))
state = manager.get_game_state("blocking")
release.set()
thread.join(timeout=2)
self.assertFalse(thread.is_alive())
self.assertEqual(state["hand_number"], 1)
self.assertEqual(len(state["hands"]), 1)
def test_duplicate_http_agent_endpoint_is_rejected_across_active_games(self) -> None:
manager = GameManager()
payload = {
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{
"id": "ai",
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101/act",
},
},
{"id": "b", "type": "calling"},
],
}
manager.create_game({"game_id": "g1", **payload})
with self.assertRaisesRegex(ValueError, "already belongs to game g1"):
manager.create_game({"game_id": "g2", **payload})
if __name__ == "__main__":
+106 -31
View File
@@ -2,10 +2,11 @@ from __future__ import annotations
import json
import sys
import time
from abc import ABC, abstractmethod
from random import Random
from typing import IO, Any
from urllib.error import URLError
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
@@ -54,6 +55,27 @@ class CallingStationAgent(PokerAgent):
return PlayerAction("fold")
def normalise_http_agent_endpoint(raw: str) -> str:
"""Return the canonical base URL for an HTTP agent endpoint."""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
if url.endswith("/game"):
url = url[: -len("/game")]
return url
def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None:
"""Extract the canonical HTTP endpoint from an agent spec, if present."""
agent_type = str(spec.get("type", "calling")).lower()
if agent_type != "http":
return None
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return normalise_http_agent_endpoint(str(endpoint))
class HttpAgent(PokerAgent):
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
@@ -66,28 +88,36 @@ class HttpAgent(PokerAgent):
ACT_PATH = "/act"
GAME_PATH = "/game"
def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None:
self.base_url = self._normalise_base_url(endpoint)
def __init__(
self,
endpoint: str,
timeout_seconds: float = 10.0,
player_id: str | None = None,
game_update_timeout_seconds: float | None = None,
retries: int = 2,
retry_backoff_seconds: float = 0.25,
) -> None:
self.base_url = normalise_http_agent_endpoint(endpoint)
self.timeout_seconds = timeout_seconds
@staticmethod
def _normalise_base_url(raw: str) -> str:
"""Strip a trailing slash so URL joins do not produce double slashes.
Centralising this also tolerates the legacy "endpoint already points
at /act" mistake by chopping off a redundant ``/act`` suffix.
"""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
return url
self.player_id = player_id
self.game_update_timeout_seconds = (
float(game_update_timeout_seconds)
if game_update_timeout_seconds is not None
else min(timeout_seconds, 3.0)
)
self.retries = max(0, retries)
self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
def _url(self, path: str) -> str:
"""Compose a full URL by joining the base with a path component."""
return f"{self.base_url}{path}"
def decide(self, observation: Observation) -> PlayerAction:
payload = self._post_json(self.ACT_PATH, observation.to_dict())
payload = self._post_json(
self.ACT_PATH,
observation.to_dict(),
timeout_seconds=self.timeout_seconds,
)
if not isinstance(payload, dict):
raise RuntimeError("agent endpoint must return a JSON object")
return PlayerAction.from_dict(payload)
@@ -100,30 +130,54 @@ class HttpAgent(PokerAgent):
only by way of the raised exception bubbling to the engine guard.
"""
try:
self._post_json(self.GAME_PATH, game_state)
self._post_json(
self.GAME_PATH,
game_state,
timeout_seconds=self.game_update_timeout_seconds,
)
except RuntimeError:
# ``/game`` is informational; treat any HTTP error as a benign
# drop rather than reraising and aborting the hand loop.
return None
def _post_json(self, path: str, payload: dict[str, Any]) -> Any:
def _post_json(
self,
path: str,
payload: dict[str, Any],
timeout_seconds: float,
) -> Any:
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
identical transport semantics (timeout, error wrapping, content-type).
"""
body = json.dumps(payload).encode("utf-8")
request = Request(
self._url(path),
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urlopen(request, timeout=self.timeout_seconds) as response:
raw = response.read().decode("utf-8")
except (OSError, URLError) as exc:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc
last_error: BaseException | None = None
raw = ""
for attempt in range(self.retries + 1):
request = Request(
self._url(path),
data=body,
headers=self._headers(),
method="POST",
)
try:
with urlopen(request, timeout=timeout_seconds) as response:
raw = response.read().decode("utf-8")
break
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
last_error = RuntimeError(
f"agent endpoint failed with HTTP {exc.code}: "
f"{self._url(path)} {detail}"
)
except (OSError, URLError) as exc:
last_error = exc
if attempt < self.retries and self.retry_backoff_seconds > 0:
time.sleep(self.retry_backoff_seconds * (2**attempt))
else:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error
if not raw:
return None
try:
@@ -133,6 +187,12 @@ class HttpAgent(PokerAgent):
f"agent endpoint returned invalid JSON: {self._url(path)}"
) from exc
def _headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json", "Connection": "close"}
if self.player_id:
headers["X-Player-Id"] = self.player_id
return headers
class HumanAgent(PokerAgent):
"""Interactive CLI agent for debugging and manual play.
@@ -189,7 +249,11 @@ class HumanAgent(PokerAgent):
return line.rstrip("\n")
def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
def build_agent(
spec: dict[str, Any],
rng: Random | None = None,
player_id: str | None = None,
) -> PokerAgent:
agent_type = str(spec.get("type", "calling")).lower()
if agent_type == "random":
return RandomAgent(rng)
@@ -199,7 +263,18 @@ def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0)))
return HttpAgent(
str(endpoint),
timeout_seconds=float(spec.get("timeout_seconds", 10.0)),
player_id=player_id,
game_update_timeout_seconds=(
float(spec["game_update_timeout_seconds"])
if "game_update_timeout_seconds" in spec
else None
),
retries=int(spec.get("retries", 2)),
retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)),
)
if agent_type in {"human", "cli", "interactive"}:
return HumanAgent()
raise ValueError(f"unknown agent type: {agent_type}")
+6 -5
View File
@@ -1082,12 +1082,13 @@ def main() -> None:
help="Disable ANSI gray coloring for streamed LLM output.",
)
parser.add_argument(
"--no-reasoning",
"--hide-reasoning",
action="store_true",
help=(
"Hide the LLM's reasoning/chain-of-thought stream from the "
"terminal. The final answer (content) is still printed so "
"operators can see the chosen action."
"terminal. The model still performs reasoning; only its "
"terminal output is suppressed. The final answer (content) "
"is still printed so operators can see the chosen action."
),
)
args = parser.parse_args()
@@ -1107,7 +1108,7 @@ def main() -> None:
console = AIAgentConsole(
keep_history=args.keep_history,
use_color=not args.no_color,
show_reasoning=not args.no_reasoning,
show_reasoning=not args.hide_reasoning,
)
service = AIAgentService(LLMClient(config), prompts, console=console)
server = create_server(args.host, args.port, service, default_player_id=args.player_id)
@@ -1120,7 +1121,7 @@ def main() -> None:
f" base_url : {config.base_url}\n"
f" player_id : {args.player_id}\n"
f" stream : {'on' if config.stream else 'off'}\n"
f" reasoning : {'off (hidden)' if args.no_reasoning else 'on'}\n"
f" reasoning : {'hidden (output suppressed)' if args.hide_reasoning else 'visible'}\n"
f" clear-screen: {'off (keep history)' if args.keep_history else 'on'}",
file=sys.stderr,
flush=True,
+214 -19
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
from copy import deepcopy
from random import Random
from threading import RLock
from time import time
from texas_holdem.agents import PokerAgent
@@ -8,6 +10,7 @@ from texas_holdem.cards import Deck
from texas_holdem.evaluator import evaluate
from texas_holdem.models import (
ActionRecord,
BlindLevel,
HandSummary,
Observation,
PlayerAction,
@@ -53,21 +56,60 @@ class TableGame:
self.small_blind = small_blind
self.big_blind = big_blind
self.rng = rng or Random()
self.lock = RLock()
self.hand_number = 0
self.button_index: int | None = None
self.board = []
self.action_history: list[ActionRecord] = []
self.hand_summaries: list[HandSummary] = []
self._last_pot_contributions: list[dict[str, object]] = []
# ``blind_history`` is an append-only log of every blind level change
# (including the initial one). Each entry's ``hand_number`` is the
# first hand that played under those stakes, which makes it trivial
# to reconstruct the schedule from the outside.
self.blind_history: list[BlindLevel] = []
self._completed_snapshot: dict[str, object] = self._to_dict_unlocked()
@property
def is_complete(self) -> bool:
return len([player for player in self.players if player.stack > 0]) < 2
def run_hand(self) -> HandSummary:
def run_hand(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
"""Play a single hand.
``small_blind`` / ``big_blind`` allow callers to bump the stakes
between hands without rebuilding the table. Either both must be
provided or both omitted (in which case the previously configured
blinds carry over). The resolved blind level is appended to
:attr:`blind_history` whenever it changes (including the very first
hand) so external observers can replay the schedule.
"""
with self.lock:
return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind)
def _run_hand_locked(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
if self.is_complete:
raise GameComplete("game is complete")
self._apply_blinds_for_hand(small_blind, big_blind)
self.hand_number += 1
# Stamp the active blind level onto the upcoming summary so a hand
# remains self-describing even after the blinds change later on.
active_blinds = BlindLevel(
hand_number=self.hand_number,
small_blind=self.small_blind,
big_blind=self.big_blind,
)
self._record_blind_level_if_new(active_blinds)
started_at = time()
self.board = []
self.action_history = []
@@ -75,6 +117,11 @@ class TableGame:
for player in self.players:
player.reset_for_hand()
starting_stacks = {
player.player_id: player.stack
for player in self.players
if player.in_hand
}
self._advance_button()
assert self.button_index is not None
@@ -86,6 +133,11 @@ class TableGame:
self._broadcast_game_update()
self._deal_hole_cards(deck)
hole_cards = {
player.player_id: list(player.hole_cards)
for player in self.players
if player.in_hand
}
small_blind_index, big_blind_index = self._blind_indexes()
self._post_blind(small_blind_index, "small_blind", self.small_blind)
self._post_blind(big_blind_index, "big_blind", self.big_blind)
@@ -109,6 +161,11 @@ class TableGame:
self._betting_round(street, start_index, self.big_blind)
awards = self._award_pots()
ending_stacks = {
player.player_id: player.stack
for player in self.players
if player.player_id in starting_stacks
}
summary = HandSummary(
game_id=self.game_id,
hand_number=self.hand_number,
@@ -116,26 +173,73 @@ class TableGame:
board=list(self.board),
actions=list(self.action_history),
awards=awards,
blinds=active_blinds,
hole_cards=hole_cards,
starting_stacks=starting_stacks,
ending_stacks=ending_stacks,
pot_contributions=deepcopy(self._last_pot_contributions),
showdown_hands=self._collect_showdown_hands(),
started_at=started_at,
finished_at=time(),
)
self.hand_summaries.append(summary)
self._completed_snapshot = deepcopy(self._to_dict_unlocked())
return summary
def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
summaries.append(self.run_hand())
if until_one_left and self.is_complete:
break
return summaries
def run_hands(
self,
max_hands: int,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[HandSummary]:
"""Play up to ``max_hands`` hands using a single blind configuration.
Passing ``small_blind`` / ``big_blind`` bumps the stakes starting
with the first hand of this call; subsequent calls can raise them
again. Leaving them ``None`` keeps the current level unchanged.
"""
with self.lock:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
# Only the first hand of the batch needs to apply the blind
# override; after that the engine reuses the stored values.
summaries.append(
self._run_hand_locked(
small_blind=small_blind,
big_blind=big_blind,
)
)
small_blind = None
big_blind = None
if until_one_left and self.is_complete:
break
return summaries
def to_dict(self) -> dict[str, object]:
with self.lock:
return self._to_dict_unlocked()
def snapshot_completed(self) -> dict[str, object]:
"""Return a stable snapshot from the latest completed hand boundary.
If a hand is currently running under ``self.lock``, this method does
not block. It returns the most recent completed hand summary and
stacks captured in memory, which is exactly what status endpoints
need while a long-running HTTP-agent decision is in progress.
"""
if self.lock.acquire(blocking=False):
try:
return deepcopy(self._to_dict_unlocked())
finally:
self.lock.release()
return deepcopy(self._completed_snapshot)
def _to_dict_unlocked(self) -> dict[str, object]:
return {
"game_id": self.game_id,
"status": "complete" if self.is_complete else "running",
@@ -143,8 +247,18 @@ class TableGame:
"button_seat": None
if self.button_index is None
else self.players[self.button_index].seat,
# ``small_blind`` / ``big_blind`` mirror the *current* level so
# legacy callers keep working. New consumers should prefer the
# structured ``blinds`` block which carries the full schedule.
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"blinds": {
"current": {
"small_blind": self.small_blind,
"big_blind": self.big_blind,
},
"history": [level.to_dict() for level in self.blind_history],
},
"starting_stack": self.starting_stack,
"players": [player.public_dict() for player in self.players],
# ``hands`` exposes every finished hand (each entry is the same
@@ -153,6 +267,47 @@ class TableGame:
"hands": [summary.to_dict() for summary in self.hand_summaries],
}
def _apply_blinds_for_hand(
self,
small_blind: int | None,
big_blind: int | None,
) -> None:
"""Validate and apply optional per-hand blind overrides.
Splitting this out keeps :meth:`run_hand` focused on the table flow
while letting us reuse the validation rules originally enforced by
``__init__``. We require both values to be supplied together so the
configuration cannot drift into an inconsistent half-update.
"""
if small_blind is None and big_blind is None:
return
if small_blind is None or big_blind is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
self.small_blind = int(small_blind)
self.big_blind = int(big_blind)
def _record_blind_level_if_new(self, level: BlindLevel) -> None:
"""Append ``level`` to :attr:`blind_history` when it differs.
Comparing against the latest entry (rather than blindly appending)
keeps the log compact: stretches of unchanged stakes only contribute
a single record. The very first hand always seeds an entry because
the history starts empty.
"""
if not self.blind_history:
self.blind_history.append(level)
return
latest = self.blind_history[-1]
if (
latest.small_blind != level.small_blind
or latest.big_blind != level.big_blind
):
self.blind_history.append(level)
def _advance_button(self) -> None:
if self.button_index is None:
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
@@ -350,9 +505,18 @@ class TableGame:
try:
requested = agent.decide(observation)
except Exception:
requested = PlayerAction("fold")
requested = self._default_action(observation.legal_actions)
return self._coerce_action(requested, observation.legal_actions)
def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
for action_type in ("check", "call", "fold"):
if action_type in by_action:
legal = by_action[action_type]
return PlayerAction(action_type, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _coerce_action(
self,
requested: PlayerAction,
@@ -413,21 +577,43 @@ class TableGame:
return current_bet, min_raise, full_raise
def _award_pots(self) -> list[PotAward]:
self._last_pot_contributions = []
total_pot = sum(player.total_bet for player in self.players)
live_players = [player for player in self.players if self._is_live(player)]
if not live_players or total_pot <= 0:
return []
if len(live_players) == 1:
live_players[0].stack += total_pot
return [PotAward(total_pot, [live_players[0].player_id], None)]
levels = sorted({player.total_bet for player in self.players if player.total_bet > 0})
if len(live_players) == 1:
winner = live_players[0]
winner.stack += total_pot
previous_level = 0
for level in levels:
contributors = [player for player in self.players if player.total_bet >= level]
pot_amount = (level - previous_level) * len(contributors)
self._last_pot_contributions.append(
{
"amount": pot_amount,
"contributors": {
player.player_id: level - previous_level
for player in contributors
},
"winners": [winner.player_id],
"hand_value": None,
}
)
previous_level = level
return [PotAward(total_pot, [winner.player_id], None)]
previous_level = 0
awards: list[PotAward] = []
for level in levels:
contributors = [player for player in self.players if player.total_bet >= level]
pot_amount = (level - previous_level) * len(contributors)
level_contributions = {
player.player_id: level - previous_level
for player in contributors
}
previous_level = level
contenders = [player for player in contributors if self._is_live(player)]
if not contenders or pot_amount <= 0:
@@ -449,13 +635,22 @@ class TableGame:
winner.stack += share
for winner in ordered_winners[:remainder]:
winner.stack += 1
winner_ids = [winner.player_id for winner in ordered_winners]
awards.append(
PotAward(
amount=pot_amount,
winners=[winner.player_id for winner in ordered_winners],
winners=winner_ids,
hand_value=best_value,
)
)
self._last_pot_contributions.append(
{
"amount": pot_amount,
"contributors": level_contributions,
"winners": winner_ids,
"hand_value": best_value,
}
)
return awards
def _collect_showdown_hands(self) -> dict[str, list]:
@@ -482,7 +677,7 @@ class TableGame:
swallow individual exceptions so a flaky remote endpoint cannot
break the table flow.
"""
snapshot = self.to_dict()
snapshot = self._to_dict_unlocked()
for agent in self.agents.values():
try:
agent.on_game_update(snapshot)
+64
View File
@@ -130,6 +130,28 @@ class Observation:
}
@dataclass(slots=True)
class BlindLevel:
"""A snapshot of the blind configuration that took effect at a given hand.
The structure is intentionally append-only: every time the blinds change
(or the very first hand seeds the initial values) we push a new
``BlindLevel`` so callers can reconstruct how the stakes evolved over the
course of the game without losing any prior state.
"""
hand_number: int
small_blind: int
big_blind: int
def to_dict(self) -> dict[str, object]:
return {
"hand_number": self.hand_number,
"small_blind": self.small_blind,
"big_blind": self.big_blind,
}
@dataclass(slots=True)
class PotAward:
amount: int
@@ -152,6 +174,14 @@ class HandSummary:
board: list[Card]
actions: list[ActionRecord]
awards: list[PotAward]
# ``blinds`` records the exact blind level used by this hand. Storing it
# on the summary (rather than only on the game) guarantees historical
# hands remain self-describing even after the blinds are raised later.
blinds: BlindLevel | None = None
hole_cards: dict[str, list[Card]] = field(default_factory=dict)
starting_stacks: dict[str, int] = field(default_factory=dict)
ending_stacks: dict[str, int] = field(default_factory=dict)
pot_contributions: list[dict[str, Any]] = field(default_factory=list)
showdown_hands: dict[str, list[Card]] = field(default_factory=dict)
started_at: float = field(default_factory=time)
finished_at: float = field(default_factory=time)
@@ -161,9 +191,20 @@ class HandSummary:
"game_id": self.game_id,
"hand_number": self.hand_number,
"button_seat": self.button_seat,
"blinds": self.blinds.to_dict() if self.blinds else None,
"board": [str(card) for card in self.board],
"actions": [record.to_dict() for record in self.actions],
"awards": [award.to_dict() for award in self.awards],
"hole_cards": {
player_id: [str(card) for card in cards]
for player_id, cards in self.hole_cards.items()
},
"starting_stacks": dict(self.starting_stacks),
"ending_stacks": dict(self.ending_stacks),
"pot_contributions": [
self._pot_contribution_to_dict(contribution)
for contribution in self.pot_contributions
],
# ``showdown_hands`` is only populated when more than one player
# remained eligible for a pot; empty dict means the hand ended
# without a showdown (e.g. everyone folded but the winner).
@@ -174,3 +215,26 @@ class HandSummary:
"started_at": self.started_at,
"finished_at": self.finished_at,
}
@staticmethod
def _pot_contribution_to_dict(contribution: dict[str, Any]) -> dict[str, object]:
hand_value = contribution.get("hand_value")
if isinstance(hand_value, HandValue):
hand_value = hand_value.to_dict()
elif isinstance(hand_value, dict):
hand_value = dict(hand_value)
raw_contributors = contribution.get("contributors") or {}
contributors = {
str(player_id): int(amount)
for player_id, amount in dict(raw_contributors).items()
}
return {
"amount": int(contribution.get("amount") or 0),
"contributors": contributors,
"winners": [
str(player_id)
for player_id in contribution.get("winners", [])
],
"hand_value": hand_value,
}
+51 -10
View File
@@ -25,8 +25,16 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
if path == ["games"]:
self._json({"games": MANAGER.list_games()})
return
if len(path) == 2 and path[0] == "games":
self._json(MANAGER.get_game(path[1]).to_dict())
if len(path) == 2 and path[0] in {"game", "games"}:
self._json(MANAGER.get_game_state(path[1]))
return
if len(path) == 4 and path[0] in {"game", "games"} and path[2] == "hands":
try:
hand_number = int(path[3])
except ValueError:
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
return
self._json(MANAGER.get_hand_state(path[1], hand_number))
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -35,23 +43,37 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
path = self._path_parts()
try:
if path == ["games"]:
if path in (["game"], ["games"]):
game = MANAGER.create_game(self._read_json())
self._json(game.to_dict(), HTTPStatus.CREATED)
self._json(game.snapshot_completed(), HTTPStatus.CREATED)
return
if len(path) == 3 and path[0] == "games" and path[2] == "hands":
if len(path) == 3 and path[0] in {"game", "games"} and path[2] == "hands":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
if len(path) == 4 and path[0] == "games" and path[2] == "hands" and path[3] == "run":
if len(path) == 4 and path[0] in {"game", "games"} and path[2] == "hands" and path[3] == "run":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -78,6 +100,25 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
raise ValueError("request body must be a JSON object")
return payload
@staticmethod
def _extract_blinds(body: dict[str, Any]) -> tuple[int | None, int | None]:
"""Parse optional blind overrides from a /hands POST body.
Callers may omit both keys (keep current level), or supply both to
raise the blinds for the upcoming batch. Providing only one is
treated as a client error and surfaced via ``ValueError`` so the
handler can reply with 400.
"""
raw_small = body.get("small_blind")
raw_big = body.get("big_blind")
if raw_small is None and raw_big is None:
return None, None
if raw_small is None or raw_big is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
return int(raw_small), int(raw_big)
def _json(self, payload: dict[str, Any], status: HTTPStatus = HTTPStatus.OK) -> None:
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(status)
+73 -9
View File
@@ -5,13 +5,14 @@ from threading import RLock
from typing import Any
from uuid import uuid4
from texas_holdem.agents import build_agent
from texas_holdem.agents import build_agent, http_agent_endpoint_from_spec
from texas_holdem.engine import TableGame
class GameManager:
def __init__(self) -> None:
self._games: dict[str, TableGame] = {}
self._http_endpoint_owners: dict[str, str] = {}
self._lock = RLock()
def create_game(self, payload: dict[str, Any]) -> TableGame:
@@ -29,12 +30,19 @@ class GameManager:
big_blind = int(payload.get("big_blind", 10))
specs = []
http_endpoints: set[str] = set()
for seat, raw_spec in enumerate(players):
if not isinstance(raw_spec, dict):
raise ValueError("each player must be an object")
player_id = str(raw_spec.get("id") or raw_spec.get("player_id") or f"p{seat + 1}")
name = str(raw_spec.get("name") or player_id)
agent = build_agent(raw_spec.get("agent", raw_spec), rng)
agent_spec = raw_spec.get("agent", raw_spec)
if not isinstance(agent_spec, dict):
raise ValueError("agent spec must be an object")
endpoint = http_agent_endpoint_from_spec(agent_spec)
if endpoint is not None:
http_endpoints.add(endpoint)
agent = build_agent(agent_spec, rng, player_id=player_id)
specs.append((player_id, name, agent))
game = TableGame(
@@ -46,9 +54,18 @@ class GameManager:
rng=rng,
)
with self._lock:
self._release_completed_http_endpoints_locked()
if game_id in self._games:
raise ValueError(f"game already exists: {game_id}")
for endpoint in http_endpoints:
owner = self._http_endpoint_owners.get(endpoint)
if owner is not None and owner != game_id:
raise ValueError(
f"http agent endpoint already belongs to game {owner}: {endpoint}"
)
self._games[game_id] = game
for endpoint in http_endpoints:
self._http_endpoint_owners[endpoint] = game_id
return game
def get_game(self, game_id: str) -> TableGame:
@@ -58,14 +75,61 @@ class GameManager:
except KeyError as exc:
raise KeyError(f"game not found: {game_id}") from exc
def get_game_state(self, game_id: str) -> dict[str, object]:
return self.get_game(game_id).snapshot_completed()
def get_hand_state(self, game_id: str, hand_number: int) -> dict[str, object]:
state = self.get_game_state(game_id)
for hand in state.get("hands", []):
if hand.get("hand_number") == hand_number:
return hand
raise KeyError(f"hand not found: {game_id} #{hand_number}")
def list_games(self) -> list[dict[str, object]]:
with self._lock:
return [game.to_dict() for game in self._games.values()]
games = list(self._games.values())
return [game.snapshot_completed() for game in games]
def run_hands(self, game_id: str, count: int = 1, until_one_left: bool = False) -> list[dict[str, object]]:
def run_hands(
self,
game_id: str,
count: int = 1,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[dict[str, object]]:
"""Run ``count`` hands, optionally raising the blinds first.
``small_blind`` / ``big_blind`` are forwarded to the engine so the
blinds can change between batches. Leaving them as ``None`` keeps
the previously configured level, which preserves the original
no-argument behaviour.
"""
game = self.get_game(game_id)
with self._lock:
return [
summary.to_dict()
for summary in game.run_hands(count, until_one_left=until_one_left)
]
summaries = [
summary.to_dict()
for summary in game.run_hands(
count,
until_one_left=until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
]
if game.is_complete:
with self._lock:
self._release_http_endpoints_for_game_locked(game_id)
return summaries
def _release_completed_http_endpoints_locked(self) -> None:
for game_id, game in list(self._games.items()):
if game.lock.acquire(blocking=False):
try:
if game.is_complete:
self._release_http_endpoints_for_game_locked(game_id)
finally:
game.lock.release()
def _release_http_endpoints_for_game_locked(self, game_id: str) -> None:
for endpoint, owner in list(self._http_endpoint_owners.items()):
if owner == game_id:
del self._http_endpoint_owners[endpoint]