7 Commits

Author SHA1 Message Date
mamamiyear 5899ea0b89 Revert "feat: add replay server and web client"
This reverts commit 3c027eae0b.
2026-05-21 09:22:04 +08:00
mamamiyear 1ee963ce2e chore: add .codex to .gitignore 2026-05-17 11:23:21 +08:00
mamamiyear 351cac7734 docs: add AGENTS.md 2026-05-15 14:58:54 +08:00
mamamiyear 79dccde963 fix: game service api block when a game is running 2026-05-13 21:42:53 +08:00
qianrui.mmmy 3c027eae0b feat: add replay server and web client 2026-05-13 17:35:46 +08:00
qianrui.mmmy 09c42e9fa3 feat: set blind bet by run hands 2026-05-13 14:30:51 +08:00
qianrui.mmmy e22586aa2f feat: add --hide-reasoning for ai agent to hide reasoning info 2026-05-12 20:42:38 +08:00
11 changed files with 758 additions and 70 deletions
+7
View File
@@ -32,5 +32,12 @@ htmlcov/
# IDE and editor files
.idea/
.vscode/
.codex/
*.swp
*.swo
# debug resources
debug/
# Node dependencies for browser automation tooling
node_modules/
+32
View File
@@ -0,0 +1,32 @@
# Repository Guidelines
## Project Structure & Module Organization
Core poker service code lives in `texas_holdem/`. Important modules include `engine.py` for Texas Hold'em rules, `service.py` for game management, `server.py` for the HTTP API, `agents.py` for local/HTTP agents, and `ai_client.py` / `human_client.py` for standalone agents. Prompt templates live in `texas_holdem/prompts/`.
Replay UI code lives in `texas_holdem_replay/`, with static browser assets under `texas_holdem_replay/static/`. Tests are in `tests/`, named by feature area such as `test_engine.py`, `test_service.py`, and `test_replay_server.py`. Design notes belong in `docs/`.
## Build, Test, and Development Commands
- `python -m unittest discover -v` runs the full test suite.
- `python -m compileall texas_holdem texas_holdem_replay tests` checks import and syntax validity.
- `python -m texas_holdem.server --host 127.0.0.1 --port 8000` starts the game service.
- `python -m texas_holdem.human_client --port 9001 --keep-history` starts an interactive human HTTP agent.
- `python -m texas_holdem.ai_client --port 9101 --api-key "$OPENAI_API_KEY" --model gpt-4o-mini` starts an OpenAI-compatible AI agent.
- `python -m texas_holdem_replay.server --port 8088` starts the replay viewer.
## Coding Style & Naming Conventions
Use Python 3.11+ standard-library APIs unless a dependency is intentionally added to `pyproject.toml`. Keep modules focused and prefer explicit dataclasses for wire/state models. Use 4-space indentation, type hints, `snake_case` for functions and variables, `PascalCase` for classes, and concise comments only where logic is non-obvious.
## Testing Guidelines
Use `unittest`. Add tests near the behavior changed: engine rules in `tests/test_engine.py`, HTTP/service behavior in `tests/test_service.py`, agent transport in `tests/test_agents.py`, and replay UI server helpers in `tests/test_replay_server.py`. New bug fixes should include a regression test. Avoid tests that require external network access or real LLM calls.
## Commit & Pull Request Guidelines
History uses short Conventional Commit-style subjects, for example `feat: add replay server and web client` and `fix: game service api block when a game is running`. Keep commits scoped to one behavior change. Pull requests should include a short summary, test commands run, linked issue or motivation, and screenshots only for replay UI or visible terminal-output changes.
## Security & Configuration Tips
Do not commit API keys. Pass LLM credentials through `OPENAI_API_KEY` or CLI flags in local shells only. HTTP Agent endpoints are exclusive per active game; preserve this invariant when changing service concurrency.
+16 -2
View File
@@ -11,6 +11,7 @@
- 支持盲注、四条街下注、弃牌、过牌、跟注、下注、加注、全下、边池和摊牌结算。
- 支持本地 Agent 和 HTTP Agent。
- 支持 Human Agent 和 OpenAI-compatible AI Agent 的终端过程输出。
- 游戏运行中可以并发查询状态;查询返回上一手完成后的稳定快照。
## 运行服务
@@ -51,6 +52,12 @@ curl -X POST http://127.0.0.1:8000/games/demo/hands/run \
curl http://127.0.0.1:8000/games/demo
```
也可以使用单数别名:
```bash
curl http://127.0.0.1:8000/game/demo
```
## HTTP Agent 协议
玩家配置可以使用远程 HTTP Agent:
@@ -62,12 +69,19 @@ curl http://127.0.0.1:8000/games/demo
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101",
"timeout_seconds": 10
"timeout_seconds": 10,
"game_update_timeout_seconds": 3,
"retries": 2,
"retry_backoff_seconds": 0.25
}
}
```
服务会向 `endpoint` 发送当前行动玩家的观察 JSON。Agent 返回:
服务会向 `endpoint + /game` 推送每手开始时的游戏快照,向 `endpoint + /act` 发送当前行动玩家的观察 JSON。`endpoint` 也可以传入历史形式的 `/act``/game` 后缀,服务会归一化为 base URL。
同一个 HTTP Agent endpoint 不能同时被不同游戏占用;后创建的游戏会返回错误。服务会给 HTTP Agent 请求自动重试,`/act` 重试仍失败时,规则引擎会按 `check > call > fold` 选择默认动作,避免整桌卡死。
Agent 返回:
```json
{"action": "call"}
+60
View File
@@ -0,0 +1,60 @@
import json
import unittest
from unittest.mock import patch
from urllib.error import URLError
from texas_holdem.agents import HttpAgent, normalise_http_agent_endpoint
class FakeResponse:
def __init__(self, payload: dict[str, object]) -> None:
self.payload = payload
def read(self) -> bytes:
return json.dumps(self.payload).encode("utf-8")
def __enter__(self) -> "FakeResponse":
return self
def __exit__(self, *args: object) -> None:
return None
class AgentTests(unittest.TestCase):
def test_normalise_http_agent_endpoint_accepts_action_or_game_paths(self) -> None:
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/act"),
"http://127.0.0.1:9101",
)
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/game/"),
"http://127.0.0.1:9101",
)
def test_http_agent_post_retries_and_sets_player_header(self) -> None:
calls = []
def fake_urlopen(request, timeout): # type: ignore[no-untyped-def]
calls.append((request, timeout))
if len(calls) == 1:
raise URLError("temporary")
return FakeResponse({"ok": True})
agent = HttpAgent(
"http://agent.test/act",
player_id="p1",
retries=1,
retry_backoff_seconds=0,
)
with patch("texas_holdem.agents.urlopen", fake_urlopen):
payload = agent._post_json("/game", {"game_id": "g1"}, timeout_seconds=2)
self.assertEqual(payload, {"ok": True})
self.assertEqual(len(calls), 2)
self.assertEqual(calls[1][0].headers["X-player-id"], "p1")
self.assertEqual(calls[1][1], 2)
if __name__ == "__main__":
unittest.main()
+74 -1
View File
@@ -1,8 +1,26 @@
import unittest
from threading import Event, Thread
from texas_holdem.agents import PokerAgent
from texas_holdem.models import Observation, PlayerAction
from texas_holdem.service import GameManager
class BlockingAgent(PokerAgent):
def __init__(self, entered: Event, release: Event) -> None:
self.entered = entered
self.release = release
def decide(self, observation: Observation) -> PlayerAction:
self.entered.set()
if not self.release.wait(timeout=5):
raise RuntimeError("test timed out waiting to release blocking agent")
for action in observation.legal_actions:
if action["action"] == "check":
return PlayerAction("check")
return PlayerAction("call")
class ServiceTests(unittest.TestCase):
def test_create_and_run_game(self) -> None:
manager = GameManager()
@@ -23,7 +41,62 @@ class ServiceTests(unittest.TestCase):
hands = manager.run_hands(game.game_id, count=1)
self.assertEqual(len(hands), 1)
self.assertEqual(manager.get_game("demo").to_dict()["hand_number"], 1)
self.assertEqual(manager.get_game_state("demo")["hand_number"], 1)
def test_get_game_state_does_not_block_during_run(self) -> None:
manager = GameManager()
entered = Event()
release = Event()
game = manager.create_game(
{
"game_id": "blocking",
"seed": 13,
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{"id": "a", "type": "calling"},
{"id": "b", "type": "calling"},
],
}
)
manager.run_hands("blocking", count=1)
game.agents["a"] = BlockingAgent(entered, release)
thread = Thread(target=lambda: manager.run_hands("blocking", count=1))
thread.start()
self.assertTrue(entered.wait(timeout=2))
state = manager.get_game_state("blocking")
release.set()
thread.join(timeout=2)
self.assertFalse(thread.is_alive())
self.assertEqual(state["hand_number"], 1)
self.assertEqual(len(state["hands"]), 1)
def test_duplicate_http_agent_endpoint_is_rejected_across_active_games(self) -> None:
manager = GameManager()
payload = {
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{
"id": "ai",
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101/act",
},
},
{"id": "b", "type": "calling"},
],
}
manager.create_game({"game_id": "g1", **payload})
with self.assertRaisesRegex(ValueError, "already belongs to game g1"):
manager.create_game({"game_id": "g2", **payload})
if __name__ == "__main__":
+106 -31
View File
@@ -2,10 +2,11 @@ from __future__ import annotations
import json
import sys
import time
from abc import ABC, abstractmethod
from random import Random
from typing import IO, Any
from urllib.error import URLError
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
@@ -54,6 +55,27 @@ class CallingStationAgent(PokerAgent):
return PlayerAction("fold")
def normalise_http_agent_endpoint(raw: str) -> str:
"""Return the canonical base URL for an HTTP agent endpoint."""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
if url.endswith("/game"):
url = url[: -len("/game")]
return url
def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None:
"""Extract the canonical HTTP endpoint from an agent spec, if present."""
agent_type = str(spec.get("type", "calling")).lower()
if agent_type != "http":
return None
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return normalise_http_agent_endpoint(str(endpoint))
class HttpAgent(PokerAgent):
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
@@ -66,28 +88,36 @@ class HttpAgent(PokerAgent):
ACT_PATH = "/act"
GAME_PATH = "/game"
def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None:
self.base_url = self._normalise_base_url(endpoint)
def __init__(
self,
endpoint: str,
timeout_seconds: float = 10.0,
player_id: str | None = None,
game_update_timeout_seconds: float | None = None,
retries: int = 2,
retry_backoff_seconds: float = 0.25,
) -> None:
self.base_url = normalise_http_agent_endpoint(endpoint)
self.timeout_seconds = timeout_seconds
@staticmethod
def _normalise_base_url(raw: str) -> str:
"""Strip a trailing slash so URL joins do not produce double slashes.
Centralising this also tolerates the legacy "endpoint already points
at /act" mistake by chopping off a redundant ``/act`` suffix.
"""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
return url
self.player_id = player_id
self.game_update_timeout_seconds = (
float(game_update_timeout_seconds)
if game_update_timeout_seconds is not None
else min(timeout_seconds, 3.0)
)
self.retries = max(0, retries)
self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
def _url(self, path: str) -> str:
"""Compose a full URL by joining the base with a path component."""
return f"{self.base_url}{path}"
def decide(self, observation: Observation) -> PlayerAction:
payload = self._post_json(self.ACT_PATH, observation.to_dict())
payload = self._post_json(
self.ACT_PATH,
observation.to_dict(),
timeout_seconds=self.timeout_seconds,
)
if not isinstance(payload, dict):
raise RuntimeError("agent endpoint must return a JSON object")
return PlayerAction.from_dict(payload)
@@ -100,30 +130,54 @@ class HttpAgent(PokerAgent):
only by way of the raised exception bubbling to the engine guard.
"""
try:
self._post_json(self.GAME_PATH, game_state)
self._post_json(
self.GAME_PATH,
game_state,
timeout_seconds=self.game_update_timeout_seconds,
)
except RuntimeError:
# ``/game`` is informational; treat any HTTP error as a benign
# drop rather than reraising and aborting the hand loop.
return None
def _post_json(self, path: str, payload: dict[str, Any]) -> Any:
def _post_json(
self,
path: str,
payload: dict[str, Any],
timeout_seconds: float,
) -> Any:
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
identical transport semantics (timeout, error wrapping, content-type).
"""
body = json.dumps(payload).encode("utf-8")
request = Request(
self._url(path),
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urlopen(request, timeout=self.timeout_seconds) as response:
raw = response.read().decode("utf-8")
except (OSError, URLError) as exc:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc
last_error: BaseException | None = None
raw = ""
for attempt in range(self.retries + 1):
request = Request(
self._url(path),
data=body,
headers=self._headers(),
method="POST",
)
try:
with urlopen(request, timeout=timeout_seconds) as response:
raw = response.read().decode("utf-8")
break
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
last_error = RuntimeError(
f"agent endpoint failed with HTTP {exc.code}: "
f"{self._url(path)} {detail}"
)
except (OSError, URLError) as exc:
last_error = exc
if attempt < self.retries and self.retry_backoff_seconds > 0:
time.sleep(self.retry_backoff_seconds * (2**attempt))
else:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error
if not raw:
return None
try:
@@ -133,6 +187,12 @@ class HttpAgent(PokerAgent):
f"agent endpoint returned invalid JSON: {self._url(path)}"
) from exc
def _headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json", "Connection": "close"}
if self.player_id:
headers["X-Player-Id"] = self.player_id
return headers
class HumanAgent(PokerAgent):
"""Interactive CLI agent for debugging and manual play.
@@ -189,7 +249,11 @@ class HumanAgent(PokerAgent):
return line.rstrip("\n")
def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
def build_agent(
spec: dict[str, Any],
rng: Random | None = None,
player_id: str | None = None,
) -> PokerAgent:
agent_type = str(spec.get("type", "calling")).lower()
if agent_type == "random":
return RandomAgent(rng)
@@ -199,7 +263,18 @@ def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0)))
return HttpAgent(
str(endpoint),
timeout_seconds=float(spec.get("timeout_seconds", 10.0)),
player_id=player_id,
game_update_timeout_seconds=(
float(spec["game_update_timeout_seconds"])
if "game_update_timeout_seconds" in spec
else None
),
retries=int(spec.get("retries", 2)),
retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)),
)
if agent_type in {"human", "cli", "interactive"}:
return HumanAgent()
raise ValueError(f"unknown agent type: {agent_type}")
+169 -3
View File
@@ -69,6 +69,109 @@ ANSI_RESET = "\x1b[0m"
# ---------------------------------------------------------------------------
class _ThinkingIndicator:
"""Animated "thinking..." marquee for the AI agent console.
Design rationale:
- Encapsulated as its own class so the animation lifecycle (timer
thread, frame state, screen erase sequence) does not pollute the
surrounding console class.
- Runs in a daemon background thread driven by ``threading.Event`` so
``stop`` returns promptly even if the current frame is mid-sleep.
- Uses ANSI ``\\r`` plus a clearing escape sequence to overwrite the
previous frame in place, avoiding scrollback noise. The frames
cycle through 0/1/2/3 dots every 0.5s as requested.
- ``start``/``stop`` are idempotent so the higher-level console can
call ``stop`` defensively (e.g. on the fallback path) without
tracking whether a marquee is actually running.
"""
# Frame interval in seconds; matches the user-visible cadence.
_FRAME_INTERVAL = 0.5
# 0..3 dots, looping.
_FRAMES = ("thinking", "thinking.", "thinking..", "thinking...")
# ANSI escape that clears from the cursor to the end of the line; we
# combine it with a leading carriage return to redraw the frame in
# place.
_ERASE_LINE = "\r\x1b[K"
def __init__(
self,
write_fn: Callable[[str], None],
gray_fn: Callable[[str], str],
) -> None:
self._write = write_fn
self._gray = gray_fn
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
# ``_active`` reflects whether a frame is currently visible on
# screen; ``stop`` uses it to decide whether to emit the final
# erase sequence.
self._active = False
# Guard against concurrent start/stop calls from different
# threads (e.g. content-delta handler vs. end_llm_stream).
self._lifecycle_lock = threading.Lock()
def start(self) -> None:
"""Begin the marquee in a background thread.
Calling ``start`` while already running is a no-op.
"""
with self._lifecycle_lock:
if self._thread is not None and self._thread.is_alive():
return
self._stop_event.clear()
self._active = True
thread = threading.Thread(
target=self._run,
name="ai-thinking-indicator",
daemon=True,
)
self._thread = thread
thread.start()
def stop(self) -> None:
"""Stop the marquee and erase the current frame from the screen.
Safe to call when not running.
"""
with self._lifecycle_lock:
thread = self._thread
if thread is None:
return
self._stop_event.set()
self._thread = None
# Wait for the worker outside the lifecycle lock so an in-flight
# ``_render_frame`` cannot deadlock against ``start`` from
# another thread.
thread.join()
if self._active:
# Wipe the last frame so the model's actual content begins on
# a clean line.
self._write(self._ERASE_LINE)
self._active = False
def _run(self) -> None:
"""Background loop: redraw the next frame every ``_FRAME_INTERVAL``."""
index = 0
while not self._stop_event.is_set():
self._render_frame(self._FRAMES[index % len(self._FRAMES)])
index += 1
# ``Event.wait`` returns immediately when ``set`` is called,
# so ``stop`` is responsive even mid-frame.
if self._stop_event.wait(self._FRAME_INTERVAL):
return
def _render_frame(self, label: str) -> None:
"""Emit one frame in place using carriage-return + erase-EOL."""
self._write(f"{self._ERASE_LINE}{self._gray(label)}")
# ---------------------------------------------------------------------------
# AI agent console
# ---------------------------------------------------------------------------
class AIAgentConsole:
"""Serialised terminal output for the standalone AI agent.
@@ -84,11 +187,30 @@ class AIAgentConsole:
output_stream: IO[str] | None = None,
keep_history: bool = False,
use_color: bool = True,
show_reasoning: bool = True,
) -> None:
self._output = output_stream if output_stream is not None else sys.stdout
self._keep_history = keep_history
self._use_color = use_color
# ``show_reasoning`` controls whether the LLM's chain-of-thought
# ("reasoning") deltas are printed to the terminal. The final
# answer ("content") is always printed so operators can still see
# the action being chosen.
self._show_reasoning = show_reasoning
# ``_lock`` serialises whole act/game render blocks (coarse grain).
# ``_io_lock`` is a finer-grained mutex protecting just the
# ``self._output.write`` calls so the thinking-indicator background
# thread can interleave safely with the main rendering thread
# without being blocked by the coarse lock.
self._lock = threading.Lock()
self._io_lock = threading.Lock()
# Animated "thinking..." marquee shown while reasoning output is
# suppressed. Created up-front so callers can ``start``/``stop``
# idempotently regardless of the show_reasoning flag.
self._thinking = _ThinkingIndicator(
write_fn=self._write,
gray_fn=self._gray,
)
@contextmanager
def act_log(self, observation: dict[str, Any]) -> Iterator[None]:
@@ -106,13 +228,32 @@ class AIAgentConsole:
def begin_llm_stream(self) -> None:
self._write(self._gray("AI MODEL STREAM\n"))
# When reasoning output is hidden, immediately start the marquee
# so the user sees liveness while the model is "thinking" before
# any content delta arrives.
if not self._show_reasoning:
self._thinking.start()
def write_llm_delta(self, kind: str, text: str) -> None:
if not text:
return
# Skip "reasoning" deltas entirely when reasoning output is hidden;
# this keeps the terminal focused on the final answer for users
# who do not care about chain-of-thought traces.
if kind == "reasoning" and not self._show_reasoning:
return
# First non-reasoning delta means the model has started speaking
# the actual answer; tear down the marquee before printing so the
# animation does not collide with the content stream.
if kind == "content" and not self._show_reasoning:
self._thinking.stop()
self._write(self._gray(text))
def end_llm_stream(self) -> None:
# Defensive stop in case the request finished without ever
# producing a content delta (e.g. fallback path / error).
if not self._show_reasoning:
self._thinking.stop()
self._write(self._gray("\n"))
def announce_action(
@@ -120,11 +261,17 @@ class AIAgentConsole:
action: dict[str, Any],
source: str = "model",
) -> None:
# Defensive stop: error / fallback paths bypass end_llm_stream, so
# we ensure the marquee never leaks into action / warning output.
self._thinking.stop()
body = json.dumps(action, ensure_ascii=False)
self._write(f"\nAI ACTION ({source}) -> {body}\n")
self._write("~" * 60 + "\n\n")
def announce_warning(self, message: str) -> None:
# Same defensive stop as ``announce_action`` - warnings can fire
# before the LLM stream closes (HTTP error, JSON parse error...).
self._thinking.stop()
self._write(f"\nAI WARNING -> {message}\n")
def _gray(self, text: str) -> str:
@@ -133,8 +280,12 @@ class AIAgentConsole:
return f"{ANSI_GRAY}{text}{ANSI_RESET}"
def _write(self, text: str) -> None:
self._output.write(text)
self._output.flush()
# The thinking-indicator background thread writes from a different
# thread than the main /act handler; the fine-grained ``_io_lock``
# avoids tearing of escape sequences and keeps stdout consistent.
with self._io_lock:
self._output.write(text)
self._output.flush()
# ---------------------------------------------------------------------------
@@ -296,8 +447,11 @@ def _format_action_history(history: list[dict[str, Any]]) -> str:
return "(no actions yet)"
# The engine never produces unbounded history within a single hand, but
# we cap defensively so a malformed payload cannot blow up token usage.
# The cap is sized to comfortably cover the worst realistic case (a
# 12-handed table running ~10 betting rounds within one hand) so the
# LLM never sees a silently truncated history at full ring tables.
rows = []
for record in history[-32:]:
for record in history[-128:]:
rows.append(
f"- [{record.get('street')}] {record.get('player_id')} -> "
f"{record.get('action')} amount={record.get('amount', 0)}"
@@ -927,6 +1081,16 @@ def main() -> None:
action="store_true",
help="Disable ANSI gray coloring for streamed LLM output.",
)
parser.add_argument(
"--hide-reasoning",
action="store_true",
help=(
"Hide the LLM's reasoning/chain-of-thought stream from the "
"terminal. The model still performs reasoning; only its "
"terminal output is suppressed. The final answer (content) "
"is still printed so operators can see the chosen action."
),
)
args = parser.parse_args()
if not args.api_key:
@@ -944,6 +1108,7 @@ def main() -> None:
console = AIAgentConsole(
keep_history=args.keep_history,
use_color=not args.no_color,
show_reasoning=not args.hide_reasoning,
)
service = AIAgentService(LLMClient(config), prompts, console=console)
server = create_server(args.host, args.port, service, default_player_id=args.player_id)
@@ -956,6 +1121,7 @@ def main() -> None:
f" base_url : {config.base_url}\n"
f" player_id : {args.player_id}\n"
f" stream : {'on' if config.stream else 'off'}\n"
f" reasoning : {'hidden (output suppressed)' if args.hide_reasoning else 'visible'}\n"
f" clear-screen: {'off (keep history)' if args.keep_history else 'on'}",
file=sys.stderr,
flush=True,
+158 -14
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
from copy import deepcopy
from random import Random
from threading import RLock
from time import time
from texas_holdem.agents import PokerAgent
@@ -8,6 +10,7 @@ from texas_holdem.cards import Deck
from texas_holdem.evaluator import evaluate
from texas_holdem.models import (
ActionRecord,
BlindLevel,
HandSummary,
Observation,
PlayerAction,
@@ -53,21 +56,59 @@ class TableGame:
self.small_blind = small_blind
self.big_blind = big_blind
self.rng = rng or Random()
self.lock = RLock()
self.hand_number = 0
self.button_index: int | None = None
self.board = []
self.action_history: list[ActionRecord] = []
self.hand_summaries: list[HandSummary] = []
# ``blind_history`` is an append-only log of every blind level change
# (including the initial one). Each entry's ``hand_number`` is the
# first hand that played under those stakes, which makes it trivial
# to reconstruct the schedule from the outside.
self.blind_history: list[BlindLevel] = []
self._completed_snapshot: dict[str, object] = self._to_dict_unlocked()
@property
def is_complete(self) -> bool:
return len([player for player in self.players if player.stack > 0]) < 2
def run_hand(self) -> HandSummary:
def run_hand(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
"""Play a single hand.
``small_blind`` / ``big_blind`` allow callers to bump the stakes
between hands without rebuilding the table. Either both must be
provided or both omitted (in which case the previously configured
blinds carry over). The resolved blind level is appended to
:attr:`blind_history` whenever it changes (including the very first
hand) so external observers can replay the schedule.
"""
with self.lock:
return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind)
def _run_hand_locked(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
if self.is_complete:
raise GameComplete("game is complete")
self._apply_blinds_for_hand(small_blind, big_blind)
self.hand_number += 1
# Stamp the active blind level onto the upcoming summary so a hand
# remains self-describing even after the blinds change later on.
active_blinds = BlindLevel(
hand_number=self.hand_number,
small_blind=self.small_blind,
big_blind=self.big_blind,
)
self._record_blind_level_if_new(active_blinds)
started_at = time()
self.board = []
self.action_history = []
@@ -116,26 +157,69 @@ class TableGame:
board=list(self.board),
actions=list(self.action_history),
awards=awards,
blinds=active_blinds,
showdown_hands=self._collect_showdown_hands(),
started_at=started_at,
finished_at=time(),
)
self.hand_summaries.append(summary)
self._completed_snapshot = deepcopy(self._to_dict_unlocked())
return summary
def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
summaries.append(self.run_hand())
if until_one_left and self.is_complete:
break
return summaries
def run_hands(
self,
max_hands: int,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[HandSummary]:
"""Play up to ``max_hands`` hands using a single blind configuration.
Passing ``small_blind`` / ``big_blind`` bumps the stakes starting
with the first hand of this call; subsequent calls can raise them
again. Leaving them ``None`` keeps the current level unchanged.
"""
with self.lock:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
# Only the first hand of the batch needs to apply the blind
# override; after that the engine reuses the stored values.
summaries.append(
self._run_hand_locked(
small_blind=small_blind,
big_blind=big_blind,
)
)
small_blind = None
big_blind = None
if until_one_left and self.is_complete:
break
return summaries
def to_dict(self) -> dict[str, object]:
with self.lock:
return self._to_dict_unlocked()
def snapshot_completed(self) -> dict[str, object]:
"""Return a stable snapshot from the latest completed hand boundary.
If a hand is currently running under ``self.lock``, this method does
not block. It returns the most recent completed hand summary and
stacks captured in memory, which is exactly what status endpoints
need while a long-running HTTP-agent decision is in progress.
"""
if self.lock.acquire(blocking=False):
try:
return deepcopy(self._to_dict_unlocked())
finally:
self.lock.release()
return deepcopy(self._completed_snapshot)
def _to_dict_unlocked(self) -> dict[str, object]:
return {
"game_id": self.game_id,
"status": "complete" if self.is_complete else "running",
@@ -143,8 +227,18 @@ class TableGame:
"button_seat": None
if self.button_index is None
else self.players[self.button_index].seat,
# ``small_blind`` / ``big_blind`` mirror the *current* level so
# legacy callers keep working. New consumers should prefer the
# structured ``blinds`` block which carries the full schedule.
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"blinds": {
"current": {
"small_blind": self.small_blind,
"big_blind": self.big_blind,
},
"history": [level.to_dict() for level in self.blind_history],
},
"starting_stack": self.starting_stack,
"players": [player.public_dict() for player in self.players],
# ``hands`` exposes every finished hand (each entry is the same
@@ -153,6 +247,47 @@ class TableGame:
"hands": [summary.to_dict() for summary in self.hand_summaries],
}
def _apply_blinds_for_hand(
self,
small_blind: int | None,
big_blind: int | None,
) -> None:
"""Validate and apply optional per-hand blind overrides.
Splitting this out keeps :meth:`run_hand` focused on the table flow
while letting us reuse the validation rules originally enforced by
``__init__``. We require both values to be supplied together so the
configuration cannot drift into an inconsistent half-update.
"""
if small_blind is None and big_blind is None:
return
if small_blind is None or big_blind is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
self.small_blind = int(small_blind)
self.big_blind = int(big_blind)
def _record_blind_level_if_new(self, level: BlindLevel) -> None:
"""Append ``level`` to :attr:`blind_history` when it differs.
Comparing against the latest entry (rather than blindly appending)
keeps the log compact: stretches of unchanged stakes only contribute
a single record. The very first hand always seeds an entry because
the history starts empty.
"""
if not self.blind_history:
self.blind_history.append(level)
return
latest = self.blind_history[-1]
if (
latest.small_blind != level.small_blind
or latest.big_blind != level.big_blind
):
self.blind_history.append(level)
def _advance_button(self) -> None:
if self.button_index is None:
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
@@ -350,9 +485,18 @@ class TableGame:
try:
requested = agent.decide(observation)
except Exception:
requested = PlayerAction("fold")
requested = self._default_action(observation.legal_actions)
return self._coerce_action(requested, observation.legal_actions)
def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
for action_type in ("check", "call", "fold"):
if action_type in by_action:
legal = by_action[action_type]
return PlayerAction(action_type, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _coerce_action(
self,
requested: PlayerAction,
@@ -482,7 +626,7 @@ class TableGame:
swallow individual exceptions so a flaky remote endpoint cannot
break the table flow.
"""
snapshot = self.to_dict()
snapshot = self._to_dict_unlocked()
for agent in self.agents.values():
try:
agent.on_game_update(snapshot)
+27
View File
@@ -130,6 +130,28 @@ class Observation:
}
@dataclass(slots=True)
class BlindLevel:
"""A snapshot of the blind configuration that took effect at a given hand.
The structure is intentionally append-only: every time the blinds change
(or the very first hand seeds the initial values) we push a new
``BlindLevel`` so callers can reconstruct how the stakes evolved over the
course of the game without losing any prior state.
"""
hand_number: int
small_blind: int
big_blind: int
def to_dict(self) -> dict[str, object]:
return {
"hand_number": self.hand_number,
"small_blind": self.small_blind,
"big_blind": self.big_blind,
}
@dataclass(slots=True)
class PotAward:
amount: int
@@ -152,6 +174,10 @@ class HandSummary:
board: list[Card]
actions: list[ActionRecord]
awards: list[PotAward]
# ``blinds`` records the exact blind level used by this hand. Storing it
# on the summary (rather than only on the game) guarantees historical
# hands remain self-describing even after the blinds are raised later.
blinds: BlindLevel | None = None
showdown_hands: dict[str, list[Card]] = field(default_factory=dict)
started_at: float = field(default_factory=time)
finished_at: float = field(default_factory=time)
@@ -161,6 +187,7 @@ class HandSummary:
"game_id": self.game_id,
"hand_number": self.hand_number,
"button_seat": self.button_seat,
"blinds": self.blinds.to_dict() if self.blinds else None,
"board": [str(card) for card in self.board],
"actions": [record.to_dict() for record in self.actions],
"awards": [award.to_dict() for award in self.awards],
+43 -10
View File
@@ -25,8 +25,8 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
if path == ["games"]:
self._json({"games": MANAGER.list_games()})
return
if len(path) == 2 and path[0] == "games":
self._json(MANAGER.get_game(path[1]).to_dict())
if len(path) == 2 and path[0] in {"game", "games"}:
self._json(MANAGER.get_game_state(path[1]))
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -35,23 +35,37 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
path = self._path_parts()
try:
if path == ["games"]:
if path in (["game"], ["games"]):
game = MANAGER.create_game(self._read_json())
self._json(game.to_dict(), HTTPStatus.CREATED)
self._json(game.snapshot_completed(), HTTPStatus.CREATED)
return
if len(path) == 3 and path[0] == "games" and path[2] == "hands":
if len(path) == 3 and path[0] in {"game", "games"} and path[2] == "hands":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
if len(path) == 4 and path[0] == "games" and path[2] == "hands" and path[3] == "run":
if len(path) == 4 and path[0] in {"game", "games"} and path[2] == "hands" and path[3] == "run":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -78,6 +92,25 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
raise ValueError("request body must be a JSON object")
return payload
@staticmethod
def _extract_blinds(body: dict[str, Any]) -> tuple[int | None, int | None]:
"""Parse optional blind overrides from a /hands POST body.
Callers may omit both keys (keep current level), or supply both to
raise the blinds for the upcoming batch. Providing only one is
treated as a client error and surfaced via ``ValueError`` so the
handler can reply with 400.
"""
raw_small = body.get("small_blind")
raw_big = body.get("big_blind")
if raw_small is None and raw_big is None:
return None, None
if raw_small is None or raw_big is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
return int(raw_small), int(raw_big)
def _json(self, payload: dict[str, Any], status: HTTPStatus = HTTPStatus.OK) -> None:
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(status)
+66 -9
View File
@@ -5,13 +5,14 @@ from threading import RLock
from typing import Any
from uuid import uuid4
from texas_holdem.agents import build_agent
from texas_holdem.agents import build_agent, http_agent_endpoint_from_spec
from texas_holdem.engine import TableGame
class GameManager:
def __init__(self) -> None:
self._games: dict[str, TableGame] = {}
self._http_endpoint_owners: dict[str, str] = {}
self._lock = RLock()
def create_game(self, payload: dict[str, Any]) -> TableGame:
@@ -29,12 +30,19 @@ class GameManager:
big_blind = int(payload.get("big_blind", 10))
specs = []
http_endpoints: set[str] = set()
for seat, raw_spec in enumerate(players):
if not isinstance(raw_spec, dict):
raise ValueError("each player must be an object")
player_id = str(raw_spec.get("id") or raw_spec.get("player_id") or f"p{seat + 1}")
name = str(raw_spec.get("name") or player_id)
agent = build_agent(raw_spec.get("agent", raw_spec), rng)
agent_spec = raw_spec.get("agent", raw_spec)
if not isinstance(agent_spec, dict):
raise ValueError("agent spec must be an object")
endpoint = http_agent_endpoint_from_spec(agent_spec)
if endpoint is not None:
http_endpoints.add(endpoint)
agent = build_agent(agent_spec, rng, player_id=player_id)
specs.append((player_id, name, agent))
game = TableGame(
@@ -46,9 +54,18 @@ class GameManager:
rng=rng,
)
with self._lock:
self._release_completed_http_endpoints_locked()
if game_id in self._games:
raise ValueError(f"game already exists: {game_id}")
for endpoint in http_endpoints:
owner = self._http_endpoint_owners.get(endpoint)
if owner is not None and owner != game_id:
raise ValueError(
f"http agent endpoint already belongs to game {owner}: {endpoint}"
)
self._games[game_id] = game
for endpoint in http_endpoints:
self._http_endpoint_owners[endpoint] = game_id
return game
def get_game(self, game_id: str) -> TableGame:
@@ -58,14 +75,54 @@ class GameManager:
except KeyError as exc:
raise KeyError(f"game not found: {game_id}") from exc
def get_game_state(self, game_id: str) -> dict[str, object]:
return self.get_game(game_id).snapshot_completed()
def list_games(self) -> list[dict[str, object]]:
with self._lock:
return [game.to_dict() for game in self._games.values()]
games = list(self._games.values())
return [game.snapshot_completed() for game in games]
def run_hands(self, game_id: str, count: int = 1, until_one_left: bool = False) -> list[dict[str, object]]:
def run_hands(
self,
game_id: str,
count: int = 1,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[dict[str, object]]:
"""Run ``count`` hands, optionally raising the blinds first.
``small_blind`` / ``big_blind`` are forwarded to the engine so the
blinds can change between batches. Leaving them as ``None`` keeps
the previously configured level, which preserves the original
no-argument behaviour.
"""
game = self.get_game(game_id)
with self._lock:
return [
summary.to_dict()
for summary in game.run_hands(count, until_one_left=until_one_left)
]
summaries = [
summary.to_dict()
for summary in game.run_hands(
count,
until_one_left=until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
]
if game.is_complete:
with self._lock:
self._release_http_endpoints_for_game_locked(game_id)
return summaries
def _release_completed_http_endpoints_locked(self) -> None:
for game_id, game in list(self._games.items()):
if game.lock.acquire(blocking=False):
try:
if game.is_complete:
self._release_http_endpoints_for_game_locked(game_id)
finally:
game.lock.release()
def _release_http_endpoints_for_game_locked(self, game_id: str) -> None:
for endpoint, owner in list(self._http_endpoint_owners.items()):
if owner == game_id:
del self._http_endpoint_owners[endpoint]