8 Commits

Author SHA1 Message Date
mamamiyear 2062f917b0 feat(web): add web control client 2026-05-17 18:04:33 +08:00
mamamiyear 01c7176b1c refactor(replay): remove replay server and web client 2026-05-17 11:27:39 +08:00
mamamiyear 1ee963ce2e chore: add .codex to .gitignore 2026-05-17 11:23:21 +08:00
mamamiyear 351cac7734 docs: add AGENTS.md 2026-05-15 14:58:54 +08:00
mamamiyear 79dccde963 fix: game service api block when a game is running 2026-05-13 21:42:53 +08:00
qianrui.mmmy 3c027eae0b feat: add replay server and web client 2026-05-13 17:35:46 +08:00
qianrui.mmmy 09c42e9fa3 feat: set blind bet by run hands 2026-05-13 14:30:51 +08:00
qianrui.mmmy e22586aa2f feat: add --hide-reasoning for ai agent to hide reasoning info 2026-05-12 20:42:38 +08:00
19 changed files with 2664 additions and 70 deletions
+7
View File
@@ -32,5 +32,12 @@ htmlcov/
# IDE and editor files
.idea/
.vscode/
.codex/
*.swp
*.swo
# debug resources
debug/
# Node dependencies for browser automation tooling
node_modules/
+28
View File
@@ -0,0 +1,28 @@
# Repository Guidelines
## Project Structure & Module Organization
Core poker service code lives in `texas_holdem/`. Important modules include `engine.py` for Texas Hold'em rules, `service.py` for game management, `server.py` for the HTTP API, `agents.py` for local/HTTP agents, and `ai_client.py` / `human_client.py` for standalone agents. Prompt templates live in `texas_holdem/prompts/`.
## Build, Test, and Development Commands
- `python -m unittest discover -v` runs the full test suite.
- `python -m texas_holdem.server --host 127.0.0.1 --port 8000` starts the game service.
- `python -m texas_holdem.human_client --port 9001 --keep-history` starts an interactive human HTTP agent.
- `python -m texas_holdem.ai_client --port 9101 --api-key "$OPENAI_API_KEY" --model gpt-4o-mini` starts an OpenAI-compatible AI agent.
## Coding Style & Naming Conventions
Use Python 3.11+ standard-library APIs unless a dependency is intentionally added to `pyproject.toml`. Keep modules focused and prefer explicit dataclasses for wire/state models. Use 4-space indentation, type hints, `snake_case` for functions and variables, `PascalCase` for classes, and concise comments only where logic is non-obvious.
## Testing Guidelines
Use `unittest`. Add tests near the behavior changed: engine rules in `tests/test_engine.py`, HTTP/service behavior in `tests/test_service.py`, agent transport in `tests/test_agents.py`. New bug fixes should include a regression test. Avoid tests that require external network access or real LLM calls.
## Commit & Pull Request Guidelines
History uses short Conventional Commit-style subjects, for example `feat: add server and agent client` and `fix: game service api block when a game is running`. Keep commits scoped to one behavior change. Pull requests should include a short summary, test commands run, linked issue or motivation, and screenshots only for visible terminal-output changes.
## Security & Configuration Tips
Do not commit API keys. Pass LLM credentials through `OPENAI_API_KEY` or CLI flags in local shells only. HTTP Agent endpoints are exclusive per active game; preserve this invariant when changing service concurrency.
+35 -2
View File
@@ -11,6 +11,7 @@
- 支持盲注、四条街下注、弃牌、过牌、跟注、下注、加注、全下、边池和摊牌结算。
- 支持本地 Agent 和 HTTP Agent。
- 支持 Human Agent 和 OpenAI-compatible AI Agent 的终端过程输出。
- 游戏运行中可以并发查询状态;查询返回上一手完成后的稳定快照。
## 运行服务
@@ -51,6 +52,12 @@ curl -X POST http://127.0.0.1:8000/games/demo/hands/run \
curl http://127.0.0.1:8000/games/demo
```
也可以使用单数别名:
```bash
curl http://127.0.0.1:8000/game/demo
```
## HTTP Agent 协议
玩家配置可以使用远程 HTTP Agent:
@@ -62,12 +69,19 @@ curl http://127.0.0.1:8000/games/demo
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101",
"timeout_seconds": 10
"timeout_seconds": 10,
"game_update_timeout_seconds": 3,
"retries": 2,
"retry_backoff_seconds": 0.25
}
}
```
服务会向 `endpoint` 发送当前行动玩家的观察 JSON。Agent 返回:
服务会向 `endpoint + /game` 推送每手开始时的游戏快照,向 `endpoint + /act` 发送当前行动玩家的观察 JSON。`endpoint` 也可以传入历史形式的 `/act``/game` 后缀,服务会归一化为 base URL。
同一个 HTTP Agent endpoint 不能同时被不同游戏占用;后创建的游戏会返回错误。服务会给 HTTP Agent 请求自动重试,`/act` 重试仍失败时,规则引擎会按 `check > call > fold` 选择默认动作,避免整桌卡死。
Agent 返回:
```json
{"action": "call"}
@@ -112,3 +126,22 @@ AI Agent 会在终端输出:
```bash
python -m unittest discover -v
```
## Web 回放与控制台
启动核心游戏服务后,可以单独启动 Web 回放服务:
```bash
python -m texas_holdem_replay.server --host 127.0.0.1 --port 8088
```
打开 `http://127.0.0.1:8088`。页面通过自身的代理接口访问核心服务,
避免浏览器跨域限制;它不会导入或耦合 `texas_holdem.engine` 内部代码。
页面支持:
- 拉取 `GET /games/{game_id}` 快照并按 `hands[].actions` 生成逐帧回放。
- 通过代理调用核心服务运行指定数量手牌。
- 可选覆盖下一批手牌的大小盲。
- 上传或粘贴静态 JSON 快照进行离线回放。
- 自动轮询正在运行的游戏,保留当前历史查看位置。
+81
View File
@@ -0,0 +1,81 @@
# Texas Hold X 回放视图设计方案
## 目标
构建一个与核心游戏服务和 Agent 解耦的独立 Web 服务,用于读取游戏详情 JSON 并以动画方式回放 Texas Hold'em 对局。它可以部署在任意能运行 Python 标准库 HTTP 服务的环境中,不要求核心服务增加前端路由,也不改变 Human HTTP Agent / AI HTTP Agent 协议。
## 架构
新增 `texas_holdem_replay` 包:
- `texas_holdem_replay.server`:标准库 HTTP 服务,负责托管静态前端文件,并提供 `/api/fetch-game` 抓取代理。
- `texas_holdem_replay/static/index.html`:独立页面入口。
- `texas_holdem_replay/static/styles.css`:像素风牌桌、卡牌、座位和响应式布局。
- `texas_holdem_replay/static/app.js`:数据归一化、手牌时间轴生成、动画播放、上传 JSON、手动抓取和自动轮询。
运行方式:
```bash
python -m texas_holdem_replay.server --host 127.0.0.1 --port 8088
```
也可以通过安装后的脚本启动:
```bash
texas-holdem-replay --host 127.0.0.1 --port 8088
```
## 数据输入
视图支持三种输入方式:
1. 填写核心游戏服务地址和 `game_id`,点击“获取”。
前端请求自身的 `/api/fetch-game?base_url=...&game_id=...`,由回放服务去访问核心服务的 `/games/{game_id}`,避免浏览器跨域限制。
2. 上传静态 JSON 文件。
文件在浏览器本地解析,不依赖核心服务。
3. 开启自动获取。
按指定秒数轮询同一个核心服务和 `game_id`,用于观察正在运行的游戏快照。新快照会先尝试和当前回放位置合并;如果当前手牌追加了 action、showdown 或 award,回放会接续到新增 frame,而不是重头播放。
`/api/fetch-game` 也支持传入完整 `url`,便于未来接入网关或静态 JSON 服务。
## 数据模型与归一化
当前游戏详情返回结构包含:
- `players`:玩家最终状态。
- `hands`:历史手牌列表。
- 每手 `actions`:行动记录,包含 `street``player_id``action``amount`、行动后 `street_bet``stack`
- `awards`:底池分配。
- `showdown_hands`:摊牌玩家手牌。
前端不会依赖核心服务内部对象,只读取 JSON 字段并做归一化:
- 根据 `players``actions` 得到座位顺序。
-`starting_stack` 开始,按历史 `actions.stack``awards` 推演每手牌开始时的筹码。
- 每手牌生成一组离散 frame:开局、跨街发公共牌、玩家行动、摊牌、结算。
- 每个 frame 都有稳定 key。上传 JSON、手动获取和自动获取都会复用同一套合并逻辑:同一 `game_id` 且能找到当前手牌时,保留当前 frame;如果用户停在最新进度末尾,且当前手牌或后续手牌出现新增 frame,则从当前位置接续播放增量。用户正在查看历史手牌时,不会被新轮询强制跳走。
- 非 showdown 玩家手牌显示卡背。
- 已预留 `hand.hole_cards[player_id]``hand.private_hands[player_id]` 兼容点,后续核心服务返回非 showdown 手牌时可直接展示。
## 动画与交互
牌桌采用卡通像素风格:
- 椭圆绿色牌桌、木质像素边框、像素筹码状态条。
- 玩家围绕牌桌分布,当前行动玩家高亮并显示冒泡文字。
- 公共牌按 flop / turn / river 分阶段发出。
- 动作之间默认保留约 1.1-1.5 秒间隔,用户可用“节奏”滑杆调慢或调快。
- 支持上一帧、下一帧、播放/暂停、重置、选择指定手牌。
## 响应式设计
桌面端为三栏布局:数据控制、牌桌、事件日志。中等屏幕下事件日志下移,手机和平板窄屏下改为单列,牌桌高度固定到可观看的移动端比例,座位尺寸通过 CSS clamp 控制,避免文字和控件溢出。
## 解耦边界
回放服务只消费核心服务的公开 HTTP JSON,不导入 `texas_holdem.engine``service` 或 Agent 代码,也不要求游戏服务开放 CORS。未来可以单独部署在 CDN + 轻量代理、容器或任意 Python 运行环境中。
## 后续适配
- 核心服务返回非 showdown 玩家手牌后,只需要让每手 JSON 包含 `hole_cards``private_hands` 映射,前端现有归一化会优先读取。
- 如果服务端未来提供增量事件流,可以在 `app.js` 增加一个 source adapter,把事件流转成同样的 frame 列表,动画层无需重写。
+4
View File
@@ -9,6 +9,10 @@ dependencies = []
texas-holdem-server = "texas_holdem.server:main"
texas-holdem-human = "texas_holdem.human_client:main"
texas-holdem-ai = "texas_holdem.ai_client:main"
texas-holdem-replay = "texas_holdem_replay.server:main"
[tool.setuptools.package-data]
texas_holdem_replay = ["static/*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
+60
View File
@@ -0,0 +1,60 @@
import json
import unittest
from unittest.mock import patch
from urllib.error import URLError
from texas_holdem.agents import HttpAgent, normalise_http_agent_endpoint
class FakeResponse:
def __init__(self, payload: dict[str, object]) -> None:
self.payload = payload
def read(self) -> bytes:
return json.dumps(self.payload).encode("utf-8")
def __enter__(self) -> "FakeResponse":
return self
def __exit__(self, *args: object) -> None:
return None
class AgentTests(unittest.TestCase):
def test_normalise_http_agent_endpoint_accepts_action_or_game_paths(self) -> None:
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/act"),
"http://127.0.0.1:9101",
)
self.assertEqual(
normalise_http_agent_endpoint("http://127.0.0.1:9101/game/"),
"http://127.0.0.1:9101",
)
def test_http_agent_post_retries_and_sets_player_header(self) -> None:
calls = []
def fake_urlopen(request, timeout): # type: ignore[no-untyped-def]
calls.append((request, timeout))
if len(calls) == 1:
raise URLError("temporary")
return FakeResponse({"ok": True})
agent = HttpAgent(
"http://agent.test/act",
player_id="p1",
retries=1,
retry_backoff_seconds=0,
)
with patch("texas_holdem.agents.urlopen", fake_urlopen):
payload = agent._post_json("/game", {"game_id": "g1"}, timeout_seconds=2)
self.assertEqual(payload, {"ok": True})
self.assertEqual(len(calls), 2)
self.assertEqual(calls[1][0].headers["X-player-id"], "p1")
self.assertEqual(calls[1][1], 2)
if __name__ == "__main__":
unittest.main()
+33
View File
@@ -0,0 +1,33 @@
from __future__ import annotations
import unittest
from texas_holdem_replay.server import build_core_url, build_game_url
class ReplayServerTests(unittest.TestCase):
def test_build_game_url_from_base_and_game_id(self) -> None:
self.assertEqual(
build_game_url({"base_url": ["http://127.0.0.1:8000/"], "game_id": ["game 1"]}),
"http://127.0.0.1:8000/games/game%201",
)
def test_build_game_url_accepts_full_url(self) -> None:
self.assertEqual(
build_game_url({"url": ["https://example.test/games/demo"]}),
"https://example.test/games/demo",
)
def test_build_game_url_rejects_non_http_url(self) -> None:
with self.assertRaises(ValueError):
build_game_url({"url": ["file:///tmp/game.json"]})
def test_build_core_url_preserves_base_path(self) -> None:
self.assertEqual(
build_core_url("http://127.0.0.1:8000/api/", "/games/demo/hands/run"),
"http://127.0.0.1:8000/api/games/demo/hands/run",
)
if __name__ == "__main__":
unittest.main()
+74 -1
View File
@@ -1,8 +1,26 @@
import unittest
from threading import Event, Thread
from texas_holdem.agents import PokerAgent
from texas_holdem.models import Observation, PlayerAction
from texas_holdem.service import GameManager
class BlockingAgent(PokerAgent):
def __init__(self, entered: Event, release: Event) -> None:
self.entered = entered
self.release = release
def decide(self, observation: Observation) -> PlayerAction:
self.entered.set()
if not self.release.wait(timeout=5):
raise RuntimeError("test timed out waiting to release blocking agent")
for action in observation.legal_actions:
if action["action"] == "check":
return PlayerAction("check")
return PlayerAction("call")
class ServiceTests(unittest.TestCase):
def test_create_and_run_game(self) -> None:
manager = GameManager()
@@ -23,7 +41,62 @@ class ServiceTests(unittest.TestCase):
hands = manager.run_hands(game.game_id, count=1)
self.assertEqual(len(hands), 1)
self.assertEqual(manager.get_game("demo").to_dict()["hand_number"], 1)
self.assertEqual(manager.get_game_state("demo")["hand_number"], 1)
def test_get_game_state_does_not_block_during_run(self) -> None:
manager = GameManager()
entered = Event()
release = Event()
game = manager.create_game(
{
"game_id": "blocking",
"seed": 13,
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{"id": "a", "type": "calling"},
{"id": "b", "type": "calling"},
],
}
)
manager.run_hands("blocking", count=1)
game.agents["a"] = BlockingAgent(entered, release)
thread = Thread(target=lambda: manager.run_hands("blocking", count=1))
thread.start()
self.assertTrue(entered.wait(timeout=2))
state = manager.get_game_state("blocking")
release.set()
thread.join(timeout=2)
self.assertFalse(thread.is_alive())
self.assertEqual(state["hand_number"], 1)
self.assertEqual(len(state["hands"]), 1)
def test_duplicate_http_agent_endpoint_is_rejected_across_active_games(self) -> None:
manager = GameManager()
payload = {
"starting_stack": 200,
"small_blind": 5,
"big_blind": 10,
"players": [
{
"id": "ai",
"agent": {
"type": "http",
"endpoint": "http://127.0.0.1:9101/act",
},
},
{"id": "b", "type": "calling"},
],
}
manager.create_game({"game_id": "g1", **payload})
with self.assertRaisesRegex(ValueError, "already belongs to game g1"):
manager.create_game({"game_id": "g2", **payload})
if __name__ == "__main__":
+106 -31
View File
@@ -2,10 +2,11 @@ from __future__ import annotations
import json
import sys
import time
from abc import ABC, abstractmethod
from random import Random
from typing import IO, Any
from urllib.error import URLError
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from texas_holdem.human_io import clear_screen, prompt_action, render_observation
@@ -54,6 +55,27 @@ class CallingStationAgent(PokerAgent):
return PlayerAction("fold")
def normalise_http_agent_endpoint(raw: str) -> str:
"""Return the canonical base URL for an HTTP agent endpoint."""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
if url.endswith("/game"):
url = url[: -len("/game")]
return url
def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None:
"""Extract the canonical HTTP endpoint from an agent spec, if present."""
agent_type = str(spec.get("type", "calling")).lower()
if agent_type != "http":
return None
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return normalise_http_agent_endpoint(str(endpoint))
class HttpAgent(PokerAgent):
"""Remote agent that talks to a base URL exposing ``/act`` and ``/game``.
@@ -66,28 +88,36 @@ class HttpAgent(PokerAgent):
ACT_PATH = "/act"
GAME_PATH = "/game"
def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None:
self.base_url = self._normalise_base_url(endpoint)
def __init__(
self,
endpoint: str,
timeout_seconds: float = 10.0,
player_id: str | None = None,
game_update_timeout_seconds: float | None = None,
retries: int = 2,
retry_backoff_seconds: float = 0.25,
) -> None:
self.base_url = normalise_http_agent_endpoint(endpoint)
self.timeout_seconds = timeout_seconds
@staticmethod
def _normalise_base_url(raw: str) -> str:
"""Strip a trailing slash so URL joins do not produce double slashes.
Centralising this also tolerates the legacy "endpoint already points
at /act" mistake by chopping off a redundant ``/act`` suffix.
"""
url = raw.rstrip("/")
if url.endswith("/act"):
url = url[: -len("/act")]
return url
self.player_id = player_id
self.game_update_timeout_seconds = (
float(game_update_timeout_seconds)
if game_update_timeout_seconds is not None
else min(timeout_seconds, 3.0)
)
self.retries = max(0, retries)
self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
def _url(self, path: str) -> str:
"""Compose a full URL by joining the base with a path component."""
return f"{self.base_url}{path}"
def decide(self, observation: Observation) -> PlayerAction:
payload = self._post_json(self.ACT_PATH, observation.to_dict())
payload = self._post_json(
self.ACT_PATH,
observation.to_dict(),
timeout_seconds=self.timeout_seconds,
)
if not isinstance(payload, dict):
raise RuntimeError("agent endpoint must return a JSON object")
return PlayerAction.from_dict(payload)
@@ -100,30 +130,54 @@ class HttpAgent(PokerAgent):
only by way of the raised exception bubbling to the engine guard.
"""
try:
self._post_json(self.GAME_PATH, game_state)
self._post_json(
self.GAME_PATH,
game_state,
timeout_seconds=self.game_update_timeout_seconds,
)
except RuntimeError:
# ``/game`` is informational; treat any HTTP error as a benign
# drop rather than reraising and aborting the hand loop.
return None
def _post_json(self, path: str, payload: dict[str, Any]) -> Any:
def _post_json(
self,
path: str,
payload: dict[str, Any],
timeout_seconds: float,
) -> Any:
"""POST ``payload`` as JSON to ``base_url + path`` and return parsed body.
Extracted as a tiny helper so ``decide`` and ``on_game_update`` share
identical transport semantics (timeout, error wrapping, content-type).
"""
body = json.dumps(payload).encode("utf-8")
request = Request(
self._url(path),
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urlopen(request, timeout=self.timeout_seconds) as response:
raw = response.read().decode("utf-8")
except (OSError, URLError) as exc:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc
last_error: BaseException | None = None
raw = ""
for attempt in range(self.retries + 1):
request = Request(
self._url(path),
data=body,
headers=self._headers(),
method="POST",
)
try:
with urlopen(request, timeout=timeout_seconds) as response:
raw = response.read().decode("utf-8")
break
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
last_error = RuntimeError(
f"agent endpoint failed with HTTP {exc.code}: "
f"{self._url(path)} {detail}"
)
except (OSError, URLError) as exc:
last_error = exc
if attempt < self.retries and self.retry_backoff_seconds > 0:
time.sleep(self.retry_backoff_seconds * (2**attempt))
else:
raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error
if not raw:
return None
try:
@@ -133,6 +187,12 @@ class HttpAgent(PokerAgent):
f"agent endpoint returned invalid JSON: {self._url(path)}"
) from exc
def _headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json", "Connection": "close"}
if self.player_id:
headers["X-Player-Id"] = self.player_id
return headers
class HumanAgent(PokerAgent):
"""Interactive CLI agent for debugging and manual play.
@@ -189,7 +249,11 @@ class HumanAgent(PokerAgent):
return line.rstrip("\n")
def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
def build_agent(
spec: dict[str, Any],
rng: Random | None = None,
player_id: str | None = None,
) -> PokerAgent:
agent_type = str(spec.get("type", "calling")).lower()
if agent_type == "random":
return RandomAgent(rng)
@@ -199,7 +263,18 @@ def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent:
endpoint = spec.get("endpoint")
if not endpoint:
raise ValueError("http agent requires an endpoint")
return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0)))
return HttpAgent(
str(endpoint),
timeout_seconds=float(spec.get("timeout_seconds", 10.0)),
player_id=player_id,
game_update_timeout_seconds=(
float(spec["game_update_timeout_seconds"])
if "game_update_timeout_seconds" in spec
else None
),
retries=int(spec.get("retries", 2)),
retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)),
)
if agent_type in {"human", "cli", "interactive"}:
return HumanAgent()
raise ValueError(f"unknown agent type: {agent_type}")
+169 -3
View File
@@ -69,6 +69,109 @@ ANSI_RESET = "\x1b[0m"
# ---------------------------------------------------------------------------
class _ThinkingIndicator:
"""Animated "thinking..." marquee for the AI agent console.
Design rationale:
- Encapsulated as its own class so the animation lifecycle (timer
thread, frame state, screen erase sequence) does not pollute the
surrounding console class.
- Runs in a daemon background thread driven by ``threading.Event`` so
``stop`` returns promptly even if the current frame is mid-sleep.
- Uses ANSI ``\\r`` plus a clearing escape sequence to overwrite the
previous frame in place, avoiding scrollback noise. The frames
cycle through 0/1/2/3 dots every 0.5s as requested.
- ``start``/``stop`` are idempotent so the higher-level console can
call ``stop`` defensively (e.g. on the fallback path) without
tracking whether a marquee is actually running.
"""
# Frame interval in seconds; matches the user-visible cadence.
_FRAME_INTERVAL = 0.5
# 0..3 dots, looping.
_FRAMES = ("thinking", "thinking.", "thinking..", "thinking...")
# ANSI escape that clears from the cursor to the end of the line; we
# combine it with a leading carriage return to redraw the frame in
# place.
_ERASE_LINE = "\r\x1b[K"
def __init__(
self,
write_fn: Callable[[str], None],
gray_fn: Callable[[str], str],
) -> None:
self._write = write_fn
self._gray = gray_fn
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
# ``_active`` reflects whether a frame is currently visible on
# screen; ``stop`` uses it to decide whether to emit the final
# erase sequence.
self._active = False
# Guard against concurrent start/stop calls from different
# threads (e.g. content-delta handler vs. end_llm_stream).
self._lifecycle_lock = threading.Lock()
def start(self) -> None:
"""Begin the marquee in a background thread.
Calling ``start`` while already running is a no-op.
"""
with self._lifecycle_lock:
if self._thread is not None and self._thread.is_alive():
return
self._stop_event.clear()
self._active = True
thread = threading.Thread(
target=self._run,
name="ai-thinking-indicator",
daemon=True,
)
self._thread = thread
thread.start()
def stop(self) -> None:
"""Stop the marquee and erase the current frame from the screen.
Safe to call when not running.
"""
with self._lifecycle_lock:
thread = self._thread
if thread is None:
return
self._stop_event.set()
self._thread = None
# Wait for the worker outside the lifecycle lock so an in-flight
# ``_render_frame`` cannot deadlock against ``start`` from
# another thread.
thread.join()
if self._active:
# Wipe the last frame so the model's actual content begins on
# a clean line.
self._write(self._ERASE_LINE)
self._active = False
def _run(self) -> None:
"""Background loop: redraw the next frame every ``_FRAME_INTERVAL``."""
index = 0
while not self._stop_event.is_set():
self._render_frame(self._FRAMES[index % len(self._FRAMES)])
index += 1
# ``Event.wait`` returns immediately when ``set`` is called,
# so ``stop`` is responsive even mid-frame.
if self._stop_event.wait(self._FRAME_INTERVAL):
return
def _render_frame(self, label: str) -> None:
"""Emit one frame in place using carriage-return + erase-EOL."""
self._write(f"{self._ERASE_LINE}{self._gray(label)}")
# ---------------------------------------------------------------------------
# AI agent console
# ---------------------------------------------------------------------------
class AIAgentConsole:
"""Serialised terminal output for the standalone AI agent.
@@ -84,11 +187,30 @@ class AIAgentConsole:
output_stream: IO[str] | None = None,
keep_history: bool = False,
use_color: bool = True,
show_reasoning: bool = True,
) -> None:
self._output = output_stream if output_stream is not None else sys.stdout
self._keep_history = keep_history
self._use_color = use_color
# ``show_reasoning`` controls whether the LLM's chain-of-thought
# ("reasoning") deltas are printed to the terminal. The final
# answer ("content") is always printed so operators can still see
# the action being chosen.
self._show_reasoning = show_reasoning
# ``_lock`` serialises whole act/game render blocks (coarse grain).
# ``_io_lock`` is a finer-grained mutex protecting just the
# ``self._output.write`` calls so the thinking-indicator background
# thread can interleave safely with the main rendering thread
# without being blocked by the coarse lock.
self._lock = threading.Lock()
self._io_lock = threading.Lock()
# Animated "thinking..." marquee shown while reasoning output is
# suppressed. Created up-front so callers can ``start``/``stop``
# idempotently regardless of the show_reasoning flag.
self._thinking = _ThinkingIndicator(
write_fn=self._write,
gray_fn=self._gray,
)
@contextmanager
def act_log(self, observation: dict[str, Any]) -> Iterator[None]:
@@ -106,13 +228,32 @@ class AIAgentConsole:
def begin_llm_stream(self) -> None:
self._write(self._gray("AI MODEL STREAM\n"))
# When reasoning output is hidden, immediately start the marquee
# so the user sees liveness while the model is "thinking" before
# any content delta arrives.
if not self._show_reasoning:
self._thinking.start()
def write_llm_delta(self, kind: str, text: str) -> None:
if not text:
return
# Skip "reasoning" deltas entirely when reasoning output is hidden;
# this keeps the terminal focused on the final answer for users
# who do not care about chain-of-thought traces.
if kind == "reasoning" and not self._show_reasoning:
return
# First non-reasoning delta means the model has started speaking
# the actual answer; tear down the marquee before printing so the
# animation does not collide with the content stream.
if kind == "content" and not self._show_reasoning:
self._thinking.stop()
self._write(self._gray(text))
def end_llm_stream(self) -> None:
# Defensive stop in case the request finished without ever
# producing a content delta (e.g. fallback path / error).
if not self._show_reasoning:
self._thinking.stop()
self._write(self._gray("\n"))
def announce_action(
@@ -120,11 +261,17 @@ class AIAgentConsole:
action: dict[str, Any],
source: str = "model",
) -> None:
# Defensive stop: error / fallback paths bypass end_llm_stream, so
# we ensure the marquee never leaks into action / warning output.
self._thinking.stop()
body = json.dumps(action, ensure_ascii=False)
self._write(f"\nAI ACTION ({source}) -> {body}\n")
self._write("~" * 60 + "\n\n")
def announce_warning(self, message: str) -> None:
# Same defensive stop as ``announce_action`` - warnings can fire
# before the LLM stream closes (HTTP error, JSON parse error...).
self._thinking.stop()
self._write(f"\nAI WARNING -> {message}\n")
def _gray(self, text: str) -> str:
@@ -133,8 +280,12 @@ class AIAgentConsole:
return f"{ANSI_GRAY}{text}{ANSI_RESET}"
def _write(self, text: str) -> None:
self._output.write(text)
self._output.flush()
# The thinking-indicator background thread writes from a different
# thread than the main /act handler; the fine-grained ``_io_lock``
# avoids tearing of escape sequences and keeps stdout consistent.
with self._io_lock:
self._output.write(text)
self._output.flush()
# ---------------------------------------------------------------------------
@@ -296,8 +447,11 @@ def _format_action_history(history: list[dict[str, Any]]) -> str:
return "(no actions yet)"
# The engine never produces unbounded history within a single hand, but
# we cap defensively so a malformed payload cannot blow up token usage.
# The cap is sized to comfortably cover the worst realistic case (a
# 12-handed table running ~10 betting rounds within one hand) so the
# LLM never sees a silently truncated history at full ring tables.
rows = []
for record in history[-32:]:
for record in history[-128:]:
rows.append(
f"- [{record.get('street')}] {record.get('player_id')} -> "
f"{record.get('action')} amount={record.get('amount', 0)}"
@@ -927,6 +1081,16 @@ def main() -> None:
action="store_true",
help="Disable ANSI gray coloring for streamed LLM output.",
)
parser.add_argument(
"--hide-reasoning",
action="store_true",
help=(
"Hide the LLM's reasoning/chain-of-thought stream from the "
"terminal. The model still performs reasoning; only its "
"terminal output is suppressed. The final answer (content) "
"is still printed so operators can see the chosen action."
),
)
args = parser.parse_args()
if not args.api_key:
@@ -944,6 +1108,7 @@ def main() -> None:
console = AIAgentConsole(
keep_history=args.keep_history,
use_color=not args.no_color,
show_reasoning=not args.hide_reasoning,
)
service = AIAgentService(LLMClient(config), prompts, console=console)
server = create_server(args.host, args.port, service, default_player_id=args.player_id)
@@ -956,6 +1121,7 @@ def main() -> None:
f" base_url : {config.base_url}\n"
f" player_id : {args.player_id}\n"
f" stream : {'on' if config.stream else 'off'}\n"
f" reasoning : {'hidden (output suppressed)' if args.hide_reasoning else 'visible'}\n"
f" clear-screen: {'off (keep history)' if args.keep_history else 'on'}",
file=sys.stderr,
flush=True,
+158 -14
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
from copy import deepcopy
from random import Random
from threading import RLock
from time import time
from texas_holdem.agents import PokerAgent
@@ -8,6 +10,7 @@ from texas_holdem.cards import Deck
from texas_holdem.evaluator import evaluate
from texas_holdem.models import (
ActionRecord,
BlindLevel,
HandSummary,
Observation,
PlayerAction,
@@ -53,21 +56,59 @@ class TableGame:
self.small_blind = small_blind
self.big_blind = big_blind
self.rng = rng or Random()
self.lock = RLock()
self.hand_number = 0
self.button_index: int | None = None
self.board = []
self.action_history: list[ActionRecord] = []
self.hand_summaries: list[HandSummary] = []
# ``blind_history`` is an append-only log of every blind level change
# (including the initial one). Each entry's ``hand_number`` is the
# first hand that played under those stakes, which makes it trivial
# to reconstruct the schedule from the outside.
self.blind_history: list[BlindLevel] = []
self._completed_snapshot: dict[str, object] = self._to_dict_unlocked()
@property
def is_complete(self) -> bool:
return len([player for player in self.players if player.stack > 0]) < 2
def run_hand(self) -> HandSummary:
def run_hand(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
"""Play a single hand.
``small_blind`` / ``big_blind`` allow callers to bump the stakes
between hands without rebuilding the table. Either both must be
provided or both omitted (in which case the previously configured
blinds carry over). The resolved blind level is appended to
:attr:`blind_history` whenever it changes (including the very first
hand) so external observers can replay the schedule.
"""
with self.lock:
return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind)
def _run_hand_locked(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
if self.is_complete:
raise GameComplete("game is complete")
self._apply_blinds_for_hand(small_blind, big_blind)
self.hand_number += 1
# Stamp the active blind level onto the upcoming summary so a hand
# remains self-describing even after the blinds change later on.
active_blinds = BlindLevel(
hand_number=self.hand_number,
small_blind=self.small_blind,
big_blind=self.big_blind,
)
self._record_blind_level_if_new(active_blinds)
started_at = time()
self.board = []
self.action_history = []
@@ -116,26 +157,69 @@ class TableGame:
board=list(self.board),
actions=list(self.action_history),
awards=awards,
blinds=active_blinds,
showdown_hands=self._collect_showdown_hands(),
started_at=started_at,
finished_at=time(),
)
self.hand_summaries.append(summary)
self._completed_snapshot = deepcopy(self._to_dict_unlocked())
return summary
def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
summaries.append(self.run_hand())
if until_one_left and self.is_complete:
break
return summaries
def run_hands(
self,
max_hands: int,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[HandSummary]:
"""Play up to ``max_hands`` hands using a single blind configuration.
Passing ``small_blind`` / ``big_blind`` bumps the stakes starting
with the first hand of this call; subsequent calls can raise them
again. Leaving them ``None`` keeps the current level unchanged.
"""
with self.lock:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
# Only the first hand of the batch needs to apply the blind
# override; after that the engine reuses the stored values.
summaries.append(
self._run_hand_locked(
small_blind=small_blind,
big_blind=big_blind,
)
)
small_blind = None
big_blind = None
if until_one_left and self.is_complete:
break
return summaries
def to_dict(self) -> dict[str, object]:
with self.lock:
return self._to_dict_unlocked()
def snapshot_completed(self) -> dict[str, object]:
"""Return a stable snapshot from the latest completed hand boundary.
If a hand is currently running under ``self.lock``, this method does
not block. It returns the most recent completed hand summary and
stacks captured in memory, which is exactly what status endpoints
need while a long-running HTTP-agent decision is in progress.
"""
if self.lock.acquire(blocking=False):
try:
return deepcopy(self._to_dict_unlocked())
finally:
self.lock.release()
return deepcopy(self._completed_snapshot)
def _to_dict_unlocked(self) -> dict[str, object]:
return {
"game_id": self.game_id,
"status": "complete" if self.is_complete else "running",
@@ -143,8 +227,18 @@ class TableGame:
"button_seat": None
if self.button_index is None
else self.players[self.button_index].seat,
# ``small_blind`` / ``big_blind`` mirror the *current* level so
# legacy callers keep working. New consumers should prefer the
# structured ``blinds`` block which carries the full schedule.
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"blinds": {
"current": {
"small_blind": self.small_blind,
"big_blind": self.big_blind,
},
"history": [level.to_dict() for level in self.blind_history],
},
"starting_stack": self.starting_stack,
"players": [player.public_dict() for player in self.players],
# ``hands`` exposes every finished hand (each entry is the same
@@ -153,6 +247,47 @@ class TableGame:
"hands": [summary.to_dict() for summary in self.hand_summaries],
}
def _apply_blinds_for_hand(
self,
small_blind: int | None,
big_blind: int | None,
) -> None:
"""Validate and apply optional per-hand blind overrides.
Splitting this out keeps :meth:`run_hand` focused on the table flow
while letting us reuse the validation rules originally enforced by
``__init__``. We require both values to be supplied together so the
configuration cannot drift into an inconsistent half-update.
"""
if small_blind is None and big_blind is None:
return
if small_blind is None or big_blind is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
self.small_blind = int(small_blind)
self.big_blind = int(big_blind)
def _record_blind_level_if_new(self, level: BlindLevel) -> None:
"""Append ``level`` to :attr:`blind_history` when it differs.
Comparing against the latest entry (rather than blindly appending)
keeps the log compact: stretches of unchanged stakes only contribute
a single record. The very first hand always seeds an entry because
the history starts empty.
"""
if not self.blind_history:
self.blind_history.append(level)
return
latest = self.blind_history[-1]
if (
latest.small_blind != level.small_blind
or latest.big_blind != level.big_blind
):
self.blind_history.append(level)
def _advance_button(self) -> None:
if self.button_index is None:
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
@@ -350,9 +485,18 @@ class TableGame:
try:
requested = agent.decide(observation)
except Exception:
requested = PlayerAction("fold")
requested = self._default_action(observation.legal_actions)
return self._coerce_action(requested, observation.legal_actions)
def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
for action_type in ("check", "call", "fold"):
if action_type in by_action:
legal = by_action[action_type]
return PlayerAction(action_type, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _coerce_action(
self,
requested: PlayerAction,
@@ -482,7 +626,7 @@ class TableGame:
swallow individual exceptions so a flaky remote endpoint cannot
break the table flow.
"""
snapshot = self.to_dict()
snapshot = self._to_dict_unlocked()
for agent in self.agents.values():
try:
agent.on_game_update(snapshot)
+27
View File
@@ -130,6 +130,28 @@ class Observation:
}
@dataclass(slots=True)
class BlindLevel:
"""A snapshot of the blind configuration that took effect at a given hand.
The structure is intentionally append-only: every time the blinds change
(or the very first hand seeds the initial values) we push a new
``BlindLevel`` so callers can reconstruct how the stakes evolved over the
course of the game without losing any prior state.
"""
hand_number: int
small_blind: int
big_blind: int
def to_dict(self) -> dict[str, object]:
return {
"hand_number": self.hand_number,
"small_blind": self.small_blind,
"big_blind": self.big_blind,
}
@dataclass(slots=True)
class PotAward:
amount: int
@@ -152,6 +174,10 @@ class HandSummary:
board: list[Card]
actions: list[ActionRecord]
awards: list[PotAward]
# ``blinds`` records the exact blind level used by this hand. Storing it
# on the summary (rather than only on the game) guarantees historical
# hands remain self-describing even after the blinds are raised later.
blinds: BlindLevel | None = None
showdown_hands: dict[str, list[Card]] = field(default_factory=dict)
started_at: float = field(default_factory=time)
finished_at: float = field(default_factory=time)
@@ -161,6 +187,7 @@ class HandSummary:
"game_id": self.game_id,
"hand_number": self.hand_number,
"button_seat": self.button_seat,
"blinds": self.blinds.to_dict() if self.blinds else None,
"board": [str(card) for card in self.board],
"actions": [record.to_dict() for record in self.actions],
"awards": [award.to_dict() for award in self.awards],
+43 -10
View File
@@ -25,8 +25,8 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
if path == ["games"]:
self._json({"games": MANAGER.list_games()})
return
if len(path) == 2 and path[0] == "games":
self._json(MANAGER.get_game(path[1]).to_dict())
if len(path) == 2 and path[0] in {"game", "games"}:
self._json(MANAGER.get_game_state(path[1]))
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -35,23 +35,37 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
path = self._path_parts()
try:
if path == ["games"]:
if path in (["game"], ["games"]):
game = MANAGER.create_game(self._read_json())
self._json(game.to_dict(), HTTPStatus.CREATED)
self._json(game.snapshot_completed(), HTTPStatus.CREATED)
return
if len(path) == 3 and path[0] == "games" and path[2] == "hands":
if len(path) == 3 and path[0] in {"game", "games"} and path[2] == "hands":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
if len(path) == 4 and path[0] == "games" and path[2] == "hands" and path[3] == "run":
if len(path) == 4 and path[0] in {"game", "games"} and path[2] == "hands" and path[3] == "run":
body = self._read_json()
count = int(body.get("count", 1))
until_one_left = bool(body.get("until_one_left", False))
summaries = MANAGER.run_hands(path[1], count, until_one_left)
self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()})
small_blind, big_blind = self._extract_blinds(body)
summaries = MANAGER.run_hands(
path[1],
count,
until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])})
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
except KeyError as exc:
@@ -78,6 +92,25 @@ class PokerRequestHandler(BaseHTTPRequestHandler):
raise ValueError("request body must be a JSON object")
return payload
@staticmethod
def _extract_blinds(body: dict[str, Any]) -> tuple[int | None, int | None]:
"""Parse optional blind overrides from a /hands POST body.
Callers may omit both keys (keep current level), or supply both to
raise the blinds for the upcoming batch. Providing only one is
treated as a client error and surfaced via ``ValueError`` so the
handler can reply with 400.
"""
raw_small = body.get("small_blind")
raw_big = body.get("big_blind")
if raw_small is None and raw_big is None:
return None, None
if raw_small is None or raw_big is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
return int(raw_small), int(raw_big)
def _json(self, payload: dict[str, Any], status: HTTPStatus = HTTPStatus.OK) -> None:
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(status)
+66 -9
View File
@@ -5,13 +5,14 @@ from threading import RLock
from typing import Any
from uuid import uuid4
from texas_holdem.agents import build_agent
from texas_holdem.agents import build_agent, http_agent_endpoint_from_spec
from texas_holdem.engine import TableGame
class GameManager:
def __init__(self) -> None:
self._games: dict[str, TableGame] = {}
self._http_endpoint_owners: dict[str, str] = {}
self._lock = RLock()
def create_game(self, payload: dict[str, Any]) -> TableGame:
@@ -29,12 +30,19 @@ class GameManager:
big_blind = int(payload.get("big_blind", 10))
specs = []
http_endpoints: set[str] = set()
for seat, raw_spec in enumerate(players):
if not isinstance(raw_spec, dict):
raise ValueError("each player must be an object")
player_id = str(raw_spec.get("id") or raw_spec.get("player_id") or f"p{seat + 1}")
name = str(raw_spec.get("name") or player_id)
agent = build_agent(raw_spec.get("agent", raw_spec), rng)
agent_spec = raw_spec.get("agent", raw_spec)
if not isinstance(agent_spec, dict):
raise ValueError("agent spec must be an object")
endpoint = http_agent_endpoint_from_spec(agent_spec)
if endpoint is not None:
http_endpoints.add(endpoint)
agent = build_agent(agent_spec, rng, player_id=player_id)
specs.append((player_id, name, agent))
game = TableGame(
@@ -46,9 +54,18 @@ class GameManager:
rng=rng,
)
with self._lock:
self._release_completed_http_endpoints_locked()
if game_id in self._games:
raise ValueError(f"game already exists: {game_id}")
for endpoint in http_endpoints:
owner = self._http_endpoint_owners.get(endpoint)
if owner is not None and owner != game_id:
raise ValueError(
f"http agent endpoint already belongs to game {owner}: {endpoint}"
)
self._games[game_id] = game
for endpoint in http_endpoints:
self._http_endpoint_owners[endpoint] = game_id
return game
def get_game(self, game_id: str) -> TableGame:
@@ -58,14 +75,54 @@ class GameManager:
except KeyError as exc:
raise KeyError(f"game not found: {game_id}") from exc
def get_game_state(self, game_id: str) -> dict[str, object]:
return self.get_game(game_id).snapshot_completed()
def list_games(self) -> list[dict[str, object]]:
with self._lock:
return [game.to_dict() for game in self._games.values()]
games = list(self._games.values())
return [game.snapshot_completed() for game in games]
def run_hands(self, game_id: str, count: int = 1, until_one_left: bool = False) -> list[dict[str, object]]:
def run_hands(
self,
game_id: str,
count: int = 1,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[dict[str, object]]:
"""Run ``count`` hands, optionally raising the blinds first.
``small_blind`` / ``big_blind`` are forwarded to the engine so the
blinds can change between batches. Leaving them as ``None`` keeps
the previously configured level, which preserves the original
no-argument behaviour.
"""
game = self.get_game(game_id)
with self._lock:
return [
summary.to_dict()
for summary in game.run_hands(count, until_one_left=until_one_left)
]
summaries = [
summary.to_dict()
for summary in game.run_hands(
count,
until_one_left=until_one_left,
small_blind=small_blind,
big_blind=big_blind,
)
]
if game.is_complete:
with self._lock:
self._release_http_endpoints_for_game_locked(game_id)
return summaries
def _release_completed_http_endpoints_locked(self) -> None:
for game_id, game in list(self._games.items()):
if game.lock.acquire(blocking=False):
try:
if game.is_complete:
self._release_http_endpoints_for_game_locked(game_id)
finally:
game.lock.release()
def _release_http_endpoints_for_game_locked(self, game_id: str) -> None:
for endpoint, owner in list(self._http_endpoint_owners.items()):
if owner == game_id:
del self._http_endpoint_owners[endpoint]
+2
View File
@@ -0,0 +1,2 @@
"""Standalone web replay viewer for Texas Hold X."""
+222
View File
@@ -0,0 +1,222 @@
from __future__ import annotations
import argparse
import json
import mimetypes
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import parse_qs, quote, urlencode, urlparse
from urllib.request import ProxyHandler, Request, build_opener
STATIC_DIR = Path(__file__).resolve().parent / "static"
DEFAULT_CORE_BASE_URL = "http://127.0.0.1:8000"
NO_PROXY_OPENER = build_opener(ProxyHandler({}))
def build_game_url(params: dict[str, list[str]]) -> str:
"""Build a core-service game URL from query parameters.
Supported forms:
- ``url=https://host/games/demo`` for callers that already have a full URL.
- ``base_url=http://host&game_id=demo`` for the common local service case.
"""
raw_url = _first(params, "url")
if raw_url:
return _validate_http_url(raw_url)
base_url = _first(params, "base_url") or DEFAULT_CORE_BASE_URL
game_id = _first(params, "game_id")
if not game_id:
raise ValueError("game_id is required when url is not provided")
parsed = urlparse(_validate_http_url(base_url.rstrip("/")))
base_path = parsed.path.rstrip("/")
game_path = f"{base_path}/games/{quote(game_id, safe='')}"
return parsed._replace(path=game_path, query="", fragment="").geturl()
def build_core_url(base_url: str, path: str) -> str:
parsed = urlparse(_validate_http_url(base_url.rstrip("/")))
base_path = parsed.path.rstrip("/")
target_path = f"{base_path}/{path.lstrip('/')}"
return parsed._replace(path=target_path, query="", fragment="").geturl()
def _first(params: dict[str, list[str]], key: str) -> str | None:
values = params.get(key)
if not values:
return None
return values[0]
def _validate_http_url(value: str) -> str:
parsed = urlparse(value)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("url must be an absolute http(s) URL")
return value
class ReplayRequestHandler(BaseHTTPRequestHandler):
server_version = "TexasHoldemReplay/0.1"
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
self._json({"ok": True})
return
if parsed.path == "/api/fetch-game":
self._handle_fetch_game(parse_qs(parsed.query))
return
self._serve_static(parsed.path)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/api/create-game":
self._proxy_json_request("POST", "/games")
return
if parsed.path == "/api/run-hands":
try:
payload = self._read_json()
base_url = str(payload.pop("base_url", DEFAULT_CORE_BASE_URL))
game_id = str(payload.pop("game_id", "")).strip()
if not game_id:
raise ValueError("game_id is required")
path = f"/games/{quote(game_id, safe='')}/hands/run"
self._proxy_json_request("POST", path, base_url=base_url, payload=payload)
except ValueError as exc:
self._json({"error": str(exc)}, HTTPStatus.BAD_REQUEST)
return
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
def log_message(self, format: str, *args: Any) -> None:
return
def _handle_fetch_game(self, params: dict[str, list[str]]) -> None:
try:
target_url = build_game_url(params)
payload, status = self._request_json("GET", target_url)
self._json(payload, HTTPStatus(status))
except ValueError as exc:
self._json({"error": str(exc)}, HTTPStatus.BAD_REQUEST)
except RuntimeError as exc:
self._json({"error": str(exc)}, HTTPStatus.BAD_GATEWAY)
def _proxy_json_request(
self,
method: str,
path: str,
base_url: str | None = None,
payload: dict[str, Any] | None = None,
) -> None:
try:
if payload is None:
payload = self._read_json()
base_url = str(payload.pop("base_url", DEFAULT_CORE_BASE_URL))
target_url = build_core_url(base_url or DEFAULT_CORE_BASE_URL, path)
response_payload, status = self._request_json(method, target_url, payload)
self._json(response_payload, HTTPStatus(status))
except ValueError as exc:
self._json({"error": str(exc)}, HTTPStatus.BAD_REQUEST)
except RuntimeError as exc:
self._json({"error": str(exc)}, HTTPStatus.BAD_GATEWAY)
def _request_json(
self,
method: str,
url: str,
payload: dict[str, Any] | None = None,
) -> tuple[dict[str, Any], int]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = Request(url, data=data, headers=headers, method=method)
try:
with NO_PROXY_OPENER.open(request, timeout=20) as response:
raw = response.read().decode("utf-8")
status = response.status
except HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
status = exc.code
except (OSError, URLError) as exc:
raise RuntimeError(f"core service request failed: {url}") from exc
try:
parsed = json.loads(raw) if raw else {}
except json.JSONDecodeError as exc:
raise RuntimeError(f"core service returned invalid JSON: {url}") from exc
if not isinstance(parsed, dict):
raise RuntimeError("core service response must be a JSON object")
return parsed, status
def _serve_static(self, path: str) -> None:
relative = "index.html" if path in {"", "/"} else path.lstrip("/")
if relative.startswith("static/"):
relative = relative[len("static/") :]
if "/" in relative:
safe_parts = [part for part in relative.split("/") if part not in {"", ".", ".."}]
relative = "/".join(safe_parts)
target = STATIC_DIR / relative
try:
target.relative_to(STATIC_DIR)
except ValueError:
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
return
if not target.is_file():
self._json({"error": "not found"}, HTTPStatus.NOT_FOUND)
return
content_type = mimetypes.guess_type(str(target))[0] or "application/octet-stream"
body = target.read_bytes()
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
if length <= 0:
return {}
try:
payload = json.loads(self.rfile.read(length).decode("utf-8"))
except json.JSONDecodeError as exc:
raise ValueError("request body must be valid JSON") from exc
if not isinstance(payload, dict):
raise ValueError("request body must be a JSON object")
return payload
def _json(self, payload: dict[str, Any], status: HTTPStatus = HTTPStatus.OK) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def create_server(host: str, port: int) -> ThreadingHTTPServer:
return ThreadingHTTPServer((host, port), ReplayRequestHandler)
def main() -> None:
parser = argparse.ArgumentParser(description="Run the Texas Hold X web replay viewer.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", default=8088, type=int)
args = parser.parse_args()
server = create_server(args.host, args.port)
query = urlencode({"base_url": DEFAULT_CORE_BASE_URL})
print(f"Texas Hold X replay viewer listening on http://{args.host}:{args.port}/?{query}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if __name__ == "__main__":
main()
+796
View File
@@ -0,0 +1,796 @@
const STREET_ORDER = ["preflop", "flop", "turn", "river"];
const STREET_BOARD_COUNTS = { preflop: 0, flop: 3, turn: 4, river: 5 };
const SUITS = { c: "♣", d: "♦", h: "♥", s: "♠" };
const state = {
rawGame: null,
replay: null,
handIndex: 0,
frameIndex: 0,
playing: false,
timer: null,
polling: false,
pollTimer: null,
};
const els = {
connectionStatus: document.getElementById("connectionStatus"),
gameStatus: document.getElementById("gameStatus"),
handCounter: document.getElementById("handCounter"),
baseUrlInput: document.getElementById("baseUrlInput"),
gameIdInput: document.getElementById("gameIdInput"),
fetchButton: document.getElementById("fetchButton"),
togglePollButton: document.getElementById("togglePollButton"),
runCountInput: document.getElementById("runCountInput"),
pollSecondsInput: document.getElementById("pollSecondsInput"),
smallBlindInput: document.getElementById("smallBlindInput"),
bigBlindInput: document.getElementById("bigBlindInput"),
untilOneLeftInput: document.getElementById("untilOneLeftInput"),
runButton: document.getElementById("runButton"),
createGameInput: document.getElementById("createGameInput"),
createGameButton: document.getElementById("createGameButton"),
fileInput: document.getElementById("fileInput"),
jsonInput: document.getElementById("jsonInput"),
loadJsonButton: document.getElementById("loadJsonButton"),
handSelect: document.getElementById("handSelect"),
speedInput: document.getElementById("speedInput"),
tableFelt: document.getElementById("tableFelt"),
seatLayer: document.getElementById("seatLayer"),
potDisplay: document.getElementById("potDisplay"),
boardCards: document.getElementById("boardCards"),
frameCaption: document.getElementById("frameCaption"),
resetButton: document.getElementById("resetButton"),
prevButton: document.getElementById("prevButton"),
playButton: document.getElementById("playButton"),
nextButton: document.getElementById("nextButton"),
frameCounter: document.getElementById("frameCounter"),
tableStats: document.getElementById("tableStats"),
playerList: document.getElementById("playerList"),
eventLog: document.getElementById("eventLog"),
};
function init() {
const params = new URLSearchParams(window.location.search);
if (params.get("base_url")) els.baseUrlInput.value = params.get("base_url");
if (params.get("game_id")) els.gameIdInput.value = params.get("game_id");
els.fetchButton.addEventListener("click", () => fetchGame());
els.togglePollButton.addEventListener("click", togglePolling);
els.runButton.addEventListener("click", runHands);
els.createGameButton.addEventListener("click", createGame);
els.loadJsonButton.addEventListener("click", loadJsonText);
els.fileInput.addEventListener("change", loadJsonFile);
els.handSelect.addEventListener("change", () => {
state.handIndex = Number(els.handSelect.value || 0);
state.frameIndex = 0;
stopPlayback();
render();
});
els.resetButton.addEventListener("click", () => setFrame(0));
els.prevButton.addEventListener("click", () => setFrame(state.frameIndex - 1));
els.nextButton.addEventListener("click", () => setFrame(state.frameIndex + 1));
els.playButton.addEventListener("click", togglePlayback);
els.createGameInput.value = JSON.stringify(defaultGameSpec(), null, 2);
renderEmpty();
}
async function fetchGame(options = {}) {
const baseUrl = els.baseUrlInput.value.trim();
const gameId = els.gameIdInput.value.trim();
if (!gameId) {
setStatus("error", "Missing game id");
return;
}
setStatus("neutral", "Fetching");
try {
const params = new URLSearchParams({ base_url: baseUrl, game_id: gameId });
const response = await fetch(`/api/fetch-game?${params.toString()}`);
const payload = await readJsonResponse(response);
loadGame(payload, { preserveTail: Boolean(options.preserveTail) });
setStatus("ok", "Connected");
} catch (error) {
setStatus("error", error.message);
}
}
async function runHands() {
const gameId = els.gameIdInput.value.trim();
if (!gameId) {
setStatus("error", "Missing game id");
return;
}
const payload = {
base_url: els.baseUrlInput.value.trim(),
game_id: gameId,
count: Number(els.runCountInput.value || 1),
until_one_left: els.untilOneLeftInput.checked,
};
const smallBlind = els.smallBlindInput.value.trim();
const bigBlind = els.bigBlindInput.value.trim();
if (smallBlind || bigBlind) {
payload.small_blind = Number(smallBlind);
payload.big_blind = Number(bigBlind);
}
setStatus("neutral", "Running");
els.runButton.disabled = true;
try {
const response = await fetch("/api/run-hands", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
});
const result = await readJsonResponse(response);
loadGame(result.game || result, { preserveTail: true });
setStatus("ok", "Updated");
} catch (error) {
setStatus("error", error.message);
} finally {
els.runButton.disabled = false;
}
}
async function createGame() {
let gameSpec;
try {
gameSpec = JSON.parse(els.createGameInput.value);
} catch (error) {
setStatus("error", error.message);
return;
}
if (!gameSpec || typeof gameSpec !== "object" || Array.isArray(gameSpec)) {
setStatus("error", "Game spec must be a JSON object");
return;
}
const payload = { ...gameSpec, base_url: els.baseUrlInput.value.trim() };
setStatus("neutral", "Creating");
els.createGameButton.disabled = true;
try {
const response = await fetch("/api/create-game", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
});
const created = await readJsonResponse(response);
loadGame(created);
setStatus("ok", "Created");
} catch (error) {
setStatus("error", error.message);
} finally {
els.createGameButton.disabled = false;
}
}
function togglePolling() {
state.polling = !state.polling;
els.togglePollButton.classList.toggle("primary", state.polling);
els.togglePollButton.textContent = state.polling ? "Stop" : "Auto";
if (state.polling) {
fetchGame({ preserveTail: true });
schedulePoll();
} else if (state.pollTimer) {
clearTimeout(state.pollTimer);
state.pollTimer = null;
}
}
function schedulePoll() {
if (!state.polling) return;
const delay = Math.max(1, Number(els.pollSecondsInput.value || 3)) * 1000;
state.pollTimer = setTimeout(async () => {
await fetchGame({ preserveTail: true });
schedulePoll();
}, delay);
}
async function readJsonResponse(response) {
const payload = await response.json().catch(() => ({}));
if (!response.ok) {
throw new Error(payload.error || `HTTP ${response.status}`);
}
return payload;
}
function loadJsonText() {
try {
const payload = JSON.parse(els.jsonInput.value);
loadGame(payload);
setStatus("ok", "Loaded JSON");
} catch (error) {
setStatus("error", error.message);
}
}
function loadJsonFile() {
const file = els.fileInput.files && els.fileInput.files[0];
if (!file) return;
const reader = new FileReader();
reader.onload = () => {
try {
const payload = JSON.parse(String(reader.result || "{}"));
els.jsonInput.value = JSON.stringify(payload, null, 2);
loadGame(payload);
setStatus("ok", "Loaded file");
} catch (error) {
setStatus("error", error.message);
}
};
reader.readAsText(file);
}
function loadGame(rawGame, options = {}) {
const wasAtTail = isAtReplayTail();
const currentHandNumber = currentHand()?.hand_number;
const currentFrameKey = currentFrame()?.key;
state.rawGame = rawGame;
state.replay = buildReplay(rawGame);
syncInputsFromGame(rawGame);
populateHandSelect();
if (options.preserveTail && wasAtTail && state.replay.hands.length) {
state.handIndex = state.replay.hands.length - 1;
state.frameIndex = Math.max(0, currentHand().frames.length - 1);
} else if (currentHandNumber) {
const handIndex = state.replay.hands.findIndex((hand) => hand.hand_number === currentHandNumber);
state.handIndex = handIndex >= 0 ? handIndex : Math.max(0, state.replay.hands.length - 1);
const frameIndex = currentFrameKey
? currentHand().frames.findIndex((frame) => frame.key === currentFrameKey)
: 0;
state.frameIndex = frameIndex >= 0 ? frameIndex : 0;
} else {
state.handIndex = Math.max(0, state.replay.hands.length - 1);
state.frameIndex = 0;
}
render();
}
function buildReplay(game) {
const players = normalizePlayers(game.players || []);
const hands = [];
let stacks = Object.fromEntries(players.map((player) => [player.player_id, numberOr(player.stack, game.starting_stack || 0)]));
const historicalHands = Array.isArray(game.hands) ? game.hands : [];
if (historicalHands.length > 0) {
stacks = Object.fromEntries(players.map((player) => [player.player_id, numberOr(game.starting_stack, player.stack)]));
}
for (const hand of historicalHands) {
const startStacks = { ...stacks };
const replayHand = buildHandReplay(game, players, hand, startStacks);
hands.push(replayHand);
stacks = { ...replayHand.endingStacks };
}
const currentFrame = buildCurrentTableFrame(game, players);
return { game, players, hands, currentFrame };
}
function normalizePlayers(players) {
return players
.map((player, index) => ({
player_id: String(player.player_id || player.id || `p${index + 1}`),
name: String(player.name || player.player_id || player.id || `Player ${index + 1}`),
seat: numberOr(player.seat, index),
stack: numberOr(player.stack, 0),
street_bet: numberOr(player.street_bet, 0),
total_bet: numberOr(player.total_bet, 0),
folded: Boolean(player.folded),
all_in: Boolean(player.all_in),
in_hand: Boolean(player.in_hand),
}))
.sort((a, b) => a.seat - b.seat);
}
function buildHandReplay(game, players, hand, startStacks) {
const handPlayers = players.map((player) => ({
...player,
stack: numberOr(startStacks[player.player_id], player.stack),
street_bet: 0,
total_bet: 0,
folded: false,
all_in: false,
in_hand: numberOr(startStacks[player.player_id], player.stack) > 0,
hole_cards: holeCardsFor(hand, player.player_id),
}));
const playerState = new Map(handPlayers.map((player) => [player.player_id, player]));
const frames = [];
const actions = Array.isArray(hand.actions) ? hand.actions : [];
const blinds = hand.blinds || {};
frames.push(snapshotFrame({
key: `h${hand.hand_number}:start`,
type: "start",
hand,
players: handPlayers,
board: [],
pot: 0,
activePlayerId: null,
caption: `Hand ${hand.hand_number} starts. Blinds ${blinds.small_blind || game.small_blind}/${blinds.big_blind || game.big_blind}.`,
event: `Hand ${hand.hand_number} started`,
}));
let currentStreet = "preflop";
let boardCount = 0;
actions.forEach((action, index) => {
const street = String(action.street || currentStreet);
if (street !== currentStreet) {
currentStreet = street;
boardCount = STREET_BOARD_COUNTS[street] || boardCount;
frames.push(snapshotFrame({
key: `h${hand.hand_number}:${street}:deal`,
type: "street",
hand,
players: handPlayers,
board: (hand.board || []).slice(0, boardCount),
pot: totalPot(handPlayers),
activePlayerId: null,
caption: `${streetLabel(street)} dealt.`,
event: `${streetLabel(street)} board`,
}));
resetStreetBets(handPlayers);
}
const player = playerState.get(String(action.player_id));
if (player) {
const amount = numberOr(action.amount, 0);
player.stack = numberOr(action.stack, player.stack - amount);
player.street_bet = numberOr(action.street_bet, player.street_bet + amount);
player.total_bet += amount;
if (action.action === "fold") player.folded = true;
if (player.stack <= 0 && !player.folded) player.all_in = true;
}
frames.push(snapshotFrame({
key: `h${hand.hand_number}:a${index}`,
type: "action",
hand,
players: handPlayers,
board: (hand.board || []).slice(0, STREET_BOARD_COUNTS[street] || boardCount),
pot: totalPot(handPlayers),
activePlayerId: String(action.player_id),
caption: formatAction(action, player),
event: formatEvent(action, player),
action,
}));
});
const finalBoard = hand.board || [];
if (finalBoard.length && boardCount < finalBoard.length) {
frames.push(snapshotFrame({
key: `h${hand.hand_number}:board-final`,
type: "street",
hand,
players: handPlayers,
board: finalBoard,
pot: totalPot(handPlayers),
activePlayerId: null,
caption: "Final board is visible.",
event: "Final board",
}));
}
const showdown = hand.showdown_hands || {};
if (Object.keys(showdown).length > 0) {
for (const [playerId, cards] of Object.entries(showdown)) {
const player = playerState.get(playerId);
if (player) player.hole_cards = Array.isArray(cards) ? cards : [];
}
frames.push(snapshotFrame({
key: `h${hand.hand_number}:showdown`,
type: "showdown",
hand,
players: handPlayers,
board: finalBoard,
pot: totalPot(handPlayers),
activePlayerId: null,
caption: "Showdown.",
event: "Showdown",
}));
}
const awards = Array.isArray(hand.awards) ? hand.awards : [];
for (const award of awards) {
const amount = numberOr(award.amount, 0);
const winners = Array.isArray(award.winners) ? award.winners : [];
const share = winners.length ? Math.floor(amount / winners.length) : 0;
let remainder = winners.length ? amount % winners.length : 0;
winners.forEach((winnerId) => {
const winner = playerState.get(String(winnerId));
if (!winner) return;
winner.stack += share + (remainder > 0 ? 1 : 0);
remainder -= 1;
});
frames.push(snapshotFrame({
key: `h${hand.hand_number}:award:${frames.length}`,
type: "award",
hand,
players: handPlayers,
board: finalBoard,
pot: 0,
activePlayerId: winners[0] ? String(winners[0]) : null,
caption: formatAward(award),
event: formatAward(award),
}));
}
const endingStacks = Object.fromEntries(handPlayers.map((player) => [player.player_id, player.stack]));
if (frames.length === 1) {
frames.push(snapshotFrame({
key: `h${hand.hand_number}:empty`,
type: "empty",
hand,
players: handPlayers,
board: finalBoard,
pot: 0,
activePlayerId: null,
caption: "No actions recorded for this hand.",
event: "No actions",
}));
}
return { hand_number: hand.hand_number, hand, frames, endingStacks };
}
function buildCurrentTableFrame(game, players) {
return {
key: "current-table",
type: "current",
hand_number: game.hand_number || 0,
button_seat: game.button_seat,
players: players.map((player) => ({
...player,
street_bet: numberOr(player.street_bet, 0),
total_bet: numberOr(player.total_bet, 0),
hole_cards: [],
})),
board: [],
pot: players.reduce((sum, player) => sum + numberOr(player.total_bet, 0), 0),
activePlayerId: null,
caption: "Current table snapshot. Run hands or load history to replay actions.",
event: "Current table snapshot",
action: null,
};
}
function snapshotFrame({ key, type, hand, players, board, pot, activePlayerId, caption, event, action }) {
return {
key,
type,
hand_number: hand.hand_number,
button_seat: hand.button_seat,
players: players.map((player) => ({ ...player, hole_cards: [...(player.hole_cards || [])] })),
board: [...(board || [])],
pot,
activePlayerId,
caption,
event,
action: action ? { ...action } : null,
};
}
function holeCardsFor(hand, playerId) {
const explicit = hand.hole_cards || hand.private_hands || {};
const showdown = hand.showdown_hands || {};
const cards = explicit[playerId] || showdown[playerId] || [];
return Array.isArray(cards) ? cards : [];
}
function totalPot(players) {
return players.reduce((sum, player) => sum + numberOr(player.total_bet, 0), 0);
}
function resetStreetBets(players) {
players.forEach((player) => {
player.street_bet = 0;
});
}
function syncInputsFromGame(game) {
if (game.game_id) els.gameIdInput.value = game.game_id;
}
function populateHandSelect() {
els.handSelect.innerHTML = "";
const hands = state.replay ? state.replay.hands : [];
if (!hands.length) {
const option = document.createElement("option");
option.value = "0";
option.textContent = "No hands";
els.handSelect.append(option);
return;
}
hands.forEach((hand, index) => {
const option = document.createElement("option");
option.value = String(index);
option.textContent = `Hand ${hand.hand_number}`;
els.handSelect.append(option);
});
}
function render() {
if (!state.replay) {
renderEmpty();
return;
}
const hand = currentHand();
const frame = currentFrame();
els.handSelect.value = state.replay.hands.length ? String(state.handIndex) : "0";
renderHeader();
renderStats();
renderPlayers(frame);
renderSeats(frame);
renderBoard(frame);
renderEvents(hand, frame);
renderTransport(hand, frame);
}
function renderEmpty() {
els.gameStatus.textContent = "No game";
els.handCounter.textContent = "Hand 0";
els.tableStats.innerHTML = "";
els.playerList.innerHTML = "";
els.seatLayer.innerHTML = "";
els.boardCards.innerHTML = "";
els.potDisplay.textContent = "Pot 0";
els.frameCaption.textContent = "Load a game snapshot";
els.frameCounter.textContent = "0 / 0";
els.eventLog.innerHTML = "";
}
function renderHeader() {
const game = state.replay.game;
els.gameStatus.textContent = `${game.game_id || "game"} · ${game.status || "unknown"}`;
els.gameStatus.className = `pill ${game.status === "complete" ? "ok" : ""}`;
els.handCounter.textContent = `Hand ${game.hand_number || 0}`;
}
function renderStats() {
const game = state.replay.game;
const hands = state.replay.hands.length;
const current = currentHand();
const stats = [
["Players", state.replay.players.length],
["Hands", hands],
["Blinds", `${game.small_blind || 0}/${game.big_blind || 0}`],
["Button", current && current.hand ? `Seat ${current.hand.button_seat}` : game.button_seat == null ? "-" : `Seat ${game.button_seat}`],
];
els.tableStats.innerHTML = stats
.map(([label, value]) => `<div class="stat"><div class="label">${escapeHtml(label)}</div><div class="value">${escapeHtml(value)}</div></div>`)
.join("");
}
function renderPlayers(frame) {
const players = frame ? frame.players : state.replay.players;
els.playerList.innerHTML = players
.map((player) => `
<div class="player-row">
<span class="seat-num">${player.seat}</span>
<span class="name">${escapeHtml(player.name)}</span>
<span class="stack">${formatChips(player.stack)}</span>
</div>
`)
.join("");
}
function renderSeats(frame) {
if (!frame) {
els.seatLayer.innerHTML = "";
return;
}
const count = Math.max(2, frame.players.length);
els.seatLayer.innerHTML = frame.players
.map((player, index) => {
const pos = seatPosition(index, count);
const classes = ["seat"];
if (frame.activePlayerId === player.player_id) classes.push("active");
if (player.folded) classes.push("folded");
const tags = [];
if (player.seat === frame.button_seat) tags.push("BTN");
if (player.folded) tags.push("FOLD");
if (player.all_in) tags.push("ALL IN");
return `
<article class="${classes.join(" ")}" style="left:${pos.x}%;top:${pos.y}%">
<div class="name-row">
<span class="name">${escapeHtml(player.name)}</span>
<span class="tag">${escapeHtml(tags[0] || `S${player.seat}`)}</span>
</div>
<div class="seat-stats">
<span>stk ${formatChips(player.stack)}</span>
<span>bet ${formatChips(player.street_bet)}</span>
<span>tot ${formatChips(player.total_bet)}</span>
<span>${player.in_hand ? "in" : "out"}</span>
</div>
<div class="cards hole-cards">${renderHoleCards(player)}</div>
</article>
`;
})
.join("");
}
function seatPosition(index, count) {
const angle = -90 + (360 / count) * index;
const rad = (angle * Math.PI) / 180;
return {
x: 50 + 42 * Math.cos(rad),
y: 50 + 39 * Math.sin(rad),
};
}
function renderBoard(frame) {
if (!frame) return;
els.potDisplay.textContent = `Pot ${formatChips(frame.pot)}`;
els.boardCards.innerHTML = renderCards(frame.board, 5);
els.frameCaption.textContent = frame.caption || "";
}
function renderTransport(hand, frame) {
const frames = hand ? hand.frames : [];
els.frameCounter.textContent = frames.length ? `${state.frameIndex + 1} / ${frames.length}` : "0 / 0";
els.prevButton.disabled = state.frameIndex <= 0;
els.resetButton.disabled = state.frameIndex <= 0;
els.nextButton.disabled = !frames.length || state.frameIndex >= frames.length - 1;
els.playButton.disabled = frames.length <= 1;
els.playButton.textContent = state.playing ? "Pause" : "Play";
if (!frame) els.frameCaption.textContent = "No hand selected";
}
function renderEvents(hand, frame) {
const frames = hand ? hand.frames : [];
els.eventLog.innerHTML = frames
.map((item, index) => `<li class="${frame && item.key === frame.key ? "current" : ""}">${escapeHtml(item.event || item.caption || `Frame ${index + 1}`)}</li>`)
.join("");
}
function setFrame(index) {
const hand = currentHand();
if (!hand) return;
state.frameIndex = clamp(index, 0, hand.frames.length - 1);
render();
}
function togglePlayback() {
if (state.playing) {
stopPlayback();
render();
} else {
state.playing = true;
render();
scheduleNextFrame();
}
}
function stopPlayback() {
state.playing = false;
if (state.timer) {
clearTimeout(state.timer);
state.timer = null;
}
}
function scheduleNextFrame() {
if (!state.playing) return;
const hand = currentHand();
if (!hand || state.frameIndex >= hand.frames.length - 1) {
stopPlayback();
render();
return;
}
const pace = Number(els.speedInput.value || 1);
const delay = Math.round(1100 / pace);
state.timer = setTimeout(() => {
state.frameIndex += 1;
render();
scheduleNextFrame();
}, delay);
}
function currentHand() {
if (!state.replay || !state.replay.hands.length) return null;
return state.replay.hands[clamp(state.handIndex, 0, state.replay.hands.length - 1)];
}
function currentFrame() {
const hand = currentHand();
if (!hand || !hand.frames.length) return state.replay ? state.replay.currentFrame : null;
return hand.frames[clamp(state.frameIndex, 0, hand.frames.length - 1)];
}
function isAtReplayTail() {
const hand = currentHand();
if (!state.replay || !hand) return true;
return state.handIndex === state.replay.hands.length - 1 && state.frameIndex >= hand.frames.length - 1;
}
function renderHoleCards(player) {
if (player.hole_cards && player.hole_cards.length) return renderCards(player.hole_cards, 2);
if (player.in_hand && !player.folded) return `${renderCardBack()}${renderCardBack()}`;
return "";
}
function renderCards(cards, padTo = 0) {
const rendered = (cards || []).map(renderCard).join("");
const missing = Math.max(0, padTo - (cards || []).length);
return rendered + Array.from({ length: missing }, () => `<span class="card back"></span>`).join("");
}
function renderCardBack() {
return `<span class="card back"></span>`;
}
function renderCard(label) {
const value = String(label || "");
const rank = value.slice(0, 1).toUpperCase();
const suit = value.slice(1, 2).toLowerCase();
const red = suit === "h" || suit === "d";
return `<span class="card ${red ? "red" : ""}">${escapeHtml(rank + (SUITS[suit] || suit))}</span>`;
}
function formatAction(action, player) {
const name = player ? player.name : action.player_id;
const verb = String(action.action || "").replace("_", " ");
const amount = numberOr(action.amount, 0);
if (amount > 0) return `${name} ${verb} ${formatChips(amount)}.`;
return `${name} ${verb}.`;
}
function formatEvent(action, player) {
const street = streetLabel(action.street || "preflop");
return `${street}: ${formatAction(action, player)}`;
}
function formatAward(award) {
const winners = (award.winners || []).join(", ") || "unknown";
const handValue = award.hand_value && award.hand_value.name ? ` with ${award.hand_value.name}` : "";
return `${winners} win ${formatChips(award.amount)}${handValue}.`;
}
function streetLabel(street) {
const index = STREET_ORDER.indexOf(street);
if (index < 0) return String(street || "").toUpperCase();
return STREET_ORDER[index].toUpperCase();
}
function numberOr(value, fallback) {
const num = Number(value);
return Number.isFinite(num) ? num : fallback;
}
function formatChips(value) {
return new Intl.NumberFormat("en-US", { maximumFractionDigits: 0 }).format(numberOr(value, 0));
}
function clamp(value, min, max) {
return Math.min(max, Math.max(min, value));
}
function escapeHtml(value) {
return String(value)
.replaceAll("&", "&amp;")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll('"', "&quot;")
.replaceAll("'", "&#039;");
}
function setStatus(kind, text) {
els.connectionStatus.className = `pill ${kind === "neutral" ? "neutral" : kind}`;
els.connectionStatus.textContent = text;
}
function defaultGameSpec() {
return {
game_id: "demo",
seed: 42,
starting_stack: 1000,
small_blind: 5,
big_blind: 10,
players: [
{ id: "agent_1", name: "Agent 1", type: "calling" },
{ id: "agent_2", name: "Agent 2", type: "random" },
{ id: "agent_3", name: "Agent 3", type: "calling" },
],
};
}
init();
+144
View File
@@ -0,0 +1,144 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Texas Hold X Replay</title>
<link rel="stylesheet" href="/styles.css">
</head>
<body>
<main class="shell">
<header class="topbar" aria-label="Application header">
<div>
<p class="eyebrow">Texas Hold X</p>
<h1>Game Replay Control</h1>
</div>
<div class="status-strip" aria-live="polite">
<span id="connectionStatus" class="pill neutral">Idle</span>
<span id="gameStatus" class="pill">No game</span>
<span id="handCounter" class="metric">Hand 0</span>
</div>
</header>
<section class="workspace">
<aside class="panel controls" aria-label="Game controls">
<div class="panel-header">
<h2>Source</h2>
</div>
<label class="field">
<span>Core service</span>
<input id="baseUrlInput" type="url" value="http://127.0.0.1:8000" spellcheck="false">
</label>
<label class="field">
<span>Game ID</span>
<input id="gameIdInput" type="text" value="demo" spellcheck="false">
</label>
<div class="button-row">
<button id="fetchButton" type="button" class="primary">Fetch</button>
<button id="togglePollButton" type="button">Auto</button>
</div>
<div class="divider"></div>
<div class="panel-header compact">
<h2>Run Hands</h2>
</div>
<div class="form-grid">
<label class="field">
<span>Count</span>
<input id="runCountInput" type="number" min="1" max="100" value="1">
</label>
<label class="field">
<span>Poll sec</span>
<input id="pollSecondsInput" type="number" min="1" max="60" value="3">
</label>
</div>
<div class="form-grid">
<label class="field">
<span>Small blind</span>
<input id="smallBlindInput" type="number" min="1" placeholder="keep">
</label>
<label class="field">
<span>Big blind</span>
<input id="bigBlindInput" type="number" min="1" placeholder="keep">
</label>
</div>
<label class="check-field">
<input id="untilOneLeftInput" type="checkbox">
<span>Run until one player remains</span>
</label>
<button id="runButton" type="button" class="wide">Run</button>
<div class="divider"></div>
<div class="panel-header compact">
<h2>Create Game</h2>
</div>
<textarea id="createGameInput" rows="10" spellcheck="false"></textarea>
<button id="createGameButton" type="button" class="wide">Create</button>
<div class="divider"></div>
<div class="panel-header compact">
<h2>Load JSON</h2>
</div>
<input id="fileInput" class="file-input" type="file" accept="application/json,.json">
<textarea id="jsonInput" rows="8" spellcheck="false" placeholder='Paste a GET /games/{id} response'></textarea>
<button id="loadJsonButton" type="button" class="wide">Load Snapshot</button>
</aside>
<section class="table-zone" aria-label="Poker table replay">
<div class="table-toolbar">
<div class="select-wrap">
<label for="handSelect">Hand</label>
<select id="handSelect"></select>
</div>
<div class="select-wrap">
<label for="speedInput">Pace</label>
<input id="speedInput" type="range" min="0.5" max="2" step="0.1" value="1">
</div>
</div>
<div class="felt-stage">
<div class="table-felt" id="tableFelt">
<div id="seatLayer" class="seat-layer"></div>
<div class="board-zone">
<div id="potDisplay" class="pot-display">Pot 0</div>
<div id="boardCards" class="cards board-cards"></div>
<div id="frameCaption" class="frame-caption">Load a game snapshot</div>
</div>
</div>
</div>
<div class="transport" aria-label="Replay transport">
<button id="resetButton" type="button" title="Reset">|&lt;</button>
<button id="prevButton" type="button" title="Previous">&lt;</button>
<button id="playButton" type="button" class="primary">Play</button>
<button id="nextButton" type="button" title="Next">&gt;</button>
<span id="frameCounter" class="metric">0 / 0</span>
</div>
</section>
<aside class="panel log-panel" aria-label="Game details">
<div class="panel-header">
<h2>Table State</h2>
</div>
<div id="tableStats" class="stats-grid"></div>
<div class="panel-header compact">
<h2>Players</h2>
</div>
<div id="playerList" class="player-list"></div>
<div class="panel-header compact">
<h2>Timeline</h2>
</div>
<ol id="eventLog" class="event-log"></ol>
</aside>
</section>
</main>
<script src="/app.js"></script>
</body>
</html>
+609
View File
@@ -0,0 +1,609 @@
:root {
color-scheme: dark;
--bg: #07100e;
--surface: #101b18;
--surface-2: #15231f;
--line: #294038;
--text: #f2f7f4;
--muted: #9fb3aa;
--green: #22c55e;
--green-2: #15803d;
--amber: #f59e0b;
--red: #ef4444;
--blue: #38bdf8;
--felt: #0f6b42;
--felt-dark: #06452c;
--wood: #7c4a1e;
--shadow: 0 16px 40px rgba(0, 0, 0, 0.34);
--radius: 8px;
--font: ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
--mono: "SFMono-Regular", Consolas, "Liberation Mono", monospace;
}
* {
box-sizing: border-box;
}
body {
margin: 0;
min-height: 100vh;
background:
linear-gradient(90deg, rgba(255, 255, 255, 0.025) 1px, transparent 1px),
linear-gradient(rgba(255, 255, 255, 0.02) 1px, transparent 1px),
var(--bg);
background-size: 24px 24px;
color: var(--text);
font-family: var(--font);
}
button,
input,
select,
textarea {
font: inherit;
}
button {
border: 1px solid var(--line);
border-radius: var(--radius);
min-height: 38px;
padding: 8px 12px;
background: #172621;
color: var(--text);
cursor: pointer;
transition: border-color 160ms ease, background 160ms ease, transform 160ms ease;
}
button:hover {
border-color: var(--green);
background: #1e322b;
}
button:active {
transform: translateY(1px);
}
button:disabled {
cursor: not-allowed;
opacity: 0.5;
}
button.primary {
border-color: #35d772;
background: linear-gradient(180deg, #28c864, #179249);
color: #04100a;
font-weight: 700;
}
button.wide {
width: 100%;
}
input,
select,
textarea {
width: 100%;
border: 1px solid var(--line);
border-radius: var(--radius);
background: #0a1411;
color: var(--text);
min-height: 38px;
padding: 8px 10px;
}
textarea {
resize: vertical;
min-height: 120px;
font-family: var(--mono);
font-size: 12px;
line-height: 1.45;
}
input:focus,
select:focus,
textarea:focus,
button:focus-visible {
outline: 2px solid rgba(34, 197, 94, 0.75);
outline-offset: 2px;
}
.shell {
width: min(1720px, calc(100vw - 32px));
margin: 0 auto;
padding: 18px 0 24px;
}
.topbar {
display: flex;
align-items: center;
justify-content: space-between;
gap: 18px;
margin-bottom: 16px;
}
.eyebrow {
margin: 0 0 4px;
color: var(--green);
font-family: var(--mono);
font-size: 12px;
text-transform: uppercase;
}
h1,
h2 {
margin: 0;
letter-spacing: 0;
}
h1 {
font-size: 26px;
}
h2 {
font-size: 14px;
}
.status-strip {
display: flex;
flex-wrap: wrap;
justify-content: flex-end;
gap: 8px;
}
.pill,
.metric {
display: inline-flex;
align-items: center;
min-height: 30px;
border: 1px solid var(--line);
border-radius: 999px;
padding: 5px 10px;
background: rgba(16, 27, 24, 0.9);
color: var(--muted);
font-family: var(--mono);
font-size: 12px;
white-space: nowrap;
}
.pill.ok {
border-color: rgba(34, 197, 94, 0.6);
color: #86efac;
}
.pill.error {
border-color: rgba(239, 68, 68, 0.7);
color: #fecaca;
}
.workspace {
display: grid;
grid-template-columns: 300px minmax(520px, 1fr) 340px;
gap: 14px;
align-items: start;
}
.panel {
border: 1px solid var(--line);
border-radius: var(--radius);
background: rgba(16, 27, 24, 0.94);
box-shadow: var(--shadow);
padding: 14px;
}
.panel-header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 12px;
}
.panel-header.compact {
margin-top: 4px;
}
.field {
display: grid;
gap: 6px;
margin-bottom: 10px;
}
.field span,
.select-wrap label {
color: var(--muted);
font-size: 12px;
}
.form-grid,
.button-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 8px;
}
.check-field {
display: flex;
align-items: center;
gap: 8px;
color: var(--muted);
font-size: 13px;
margin: 4px 0 10px;
}
.check-field input {
width: 16px;
min-height: 16px;
}
.divider {
height: 1px;
background: var(--line);
margin: 14px 0;
}
.file-input {
margin-bottom: 10px;
}
.table-zone {
min-width: 0;
}
.table-toolbar,
.transport {
display: flex;
align-items: center;
justify-content: space-between;
gap: 10px;
margin-bottom: 10px;
}
.select-wrap {
display: grid;
grid-template-columns: auto minmax(120px, 1fr);
align-items: center;
gap: 8px;
}
.felt-stage {
min-height: 560px;
border: 1px solid var(--line);
border-radius: var(--radius);
background: radial-gradient(circle at 50% 45%, rgba(56, 189, 248, 0.12), transparent 46%), #08120f;
box-shadow: var(--shadow);
padding: 20px;
}
.table-felt {
position: relative;
height: 520px;
min-height: 520px;
border: 16px solid var(--wood);
border-radius: 48%;
background:
radial-gradient(ellipse at center, rgba(255, 255, 255, 0.1), transparent 58%),
radial-gradient(ellipse at center, var(--felt), var(--felt-dark));
box-shadow: inset 0 0 0 6px rgba(255, 255, 255, 0.08), inset 0 0 32px rgba(0, 0, 0, 0.45);
overflow: visible;
}
.table-felt::after {
content: "";
position: absolute;
inset: 58px 80px;
border: 1px dashed rgba(255, 255, 255, 0.18);
border-radius: 50%;
pointer-events: none;
}
.seat-layer {
position: absolute;
inset: 0;
}
.seat {
position: absolute;
width: 154px;
min-height: 106px;
transform: translate(-50%, -50%);
border: 1px solid rgba(148, 163, 184, 0.34);
border-radius: var(--radius);
background: rgba(6, 16, 12, 0.9);
box-shadow: 0 10px 24px rgba(0, 0, 0, 0.36);
padding: 8px;
transition: border-color 160ms ease, box-shadow 160ms ease, transform 160ms ease;
}
.seat.active {
border-color: var(--green);
box-shadow: 0 0 0 2px rgba(34, 197, 94, 0.24), 0 0 24px rgba(34, 197, 94, 0.28);
}
.seat.folded {
opacity: 0.66;
}
.seat .name-row {
display: flex;
align-items: center;
justify-content: space-between;
gap: 8px;
margin-bottom: 6px;
}
.seat .name {
min-width: 0;
overflow: hidden;
color: var(--text);
font-size: 13px;
font-weight: 700;
text-overflow: ellipsis;
white-space: nowrap;
}
.tag {
border-radius: 999px;
padding: 2px 6px;
background: rgba(34, 197, 94, 0.16);
color: #bbf7d0;
font-family: var(--mono);
font-size: 10px;
white-space: nowrap;
}
.seat-stats {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 4px;
color: var(--muted);
font-family: var(--mono);
font-size: 11px;
}
.hole-cards {
margin-top: 7px;
}
.cards {
display: flex;
justify-content: center;
gap: 6px;
}
.card {
display: inline-flex;
align-items: center;
justify-content: center;
width: 42px;
height: 58px;
border: 1px solid #d7dee3;
border-radius: 6px;
background: #f8fafc;
color: #111827;
font-family: var(--mono);
font-size: 18px;
font-weight: 800;
box-shadow: 0 5px 10px rgba(0, 0, 0, 0.22);
}
.card.red {
color: #b91c1c;
}
.card.back {
border-color: #38bdf8;
background:
linear-gradient(45deg, rgba(255, 255, 255, 0.16) 25%, transparent 25%),
linear-gradient(-45deg, rgba(255, 255, 255, 0.16) 25%, transparent 25%),
#123a52;
background-size: 10px 10px;
color: transparent;
}
.board-zone {
position: absolute;
left: 50%;
top: 50%;
display: grid;
width: min(420px, 52%);
transform: translate(-50%, -50%);
justify-items: center;
gap: 12px;
z-index: 3;
}
.pot-display {
border: 1px solid rgba(245, 158, 11, 0.55);
border-radius: 999px;
padding: 6px 12px;
background: rgba(31, 20, 5, 0.7);
color: #fde68a;
font-family: var(--mono);
font-size: 13px;
}
.board-cards {
min-height: 58px;
}
.frame-caption {
width: 100%;
min-height: 42px;
border: 1px solid rgba(255, 255, 255, 0.16);
border-radius: var(--radius);
padding: 9px 12px;
background: rgba(3, 7, 18, 0.62);
color: var(--text);
text-align: center;
}
.transport {
justify-content: center;
margin: 10px 0 0;
}
.transport button {
min-width: 48px;
}
.stats-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 8px;
}
.stat {
border: 1px solid var(--line);
border-radius: var(--radius);
background: #0a1411;
padding: 9px;
}
.stat .label {
color: var(--muted);
font-size: 11px;
}
.stat .value {
margin-top: 4px;
font-family: var(--mono);
font-size: 16px;
}
.player-list {
display: grid;
gap: 8px;
}
.player-row {
display: grid;
grid-template-columns: 28px 1fr auto;
gap: 8px;
align-items: center;
border: 1px solid var(--line);
border-radius: var(--radius);
background: #0a1411;
padding: 8px;
}
.player-row .seat-num {
display: inline-flex;
align-items: center;
justify-content: center;
width: 28px;
height: 28px;
border-radius: 50%;
background: rgba(34, 197, 94, 0.14);
color: #bbf7d0;
font-family: var(--mono);
font-size: 12px;
}
.player-row .name {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.player-row .stack {
color: var(--muted);
font-family: var(--mono);
font-size: 12px;
}
.event-log {
display: grid;
max-height: 392px;
margin: 0;
padding: 0 0 0 20px;
gap: 7px;
overflow: auto;
}
.event-log li {
border-left: 2px solid rgba(34, 197, 94, 0.35);
padding: 2px 0 2px 8px;
color: var(--muted);
font-size: 13px;
}
.event-log li.current {
color: var(--text);
border-left-color: var(--green);
}
@media (max-width: 1180px) {
.workspace {
grid-template-columns: 300px minmax(0, 1fr);
}
.log-panel {
grid-column: 1 / -1;
}
}
@media (max-width: 820px) {
.shell {
width: min(100vw - 20px, 760px);
padding-top: 12px;
}
.topbar {
align-items: flex-start;
flex-direction: column;
}
.status-strip {
justify-content: flex-start;
}
.workspace {
grid-template-columns: 1fr;
}
.felt-stage {
min-height: 430px;
padding: 12px;
}
.table-felt {
height: 390px;
min-height: 390px;
border-width: 10px;
}
.seat {
width: 124px;
min-height: 94px;
padding: 7px;
}
.seat-stats {
grid-template-columns: 1fr;
}
.card {
width: 34px;
height: 48px;
font-size: 15px;
}
.board-zone {
width: 62%;
}
.table-toolbar {
align-items: stretch;
flex-direction: column;
}
}
@media (prefers-reduced-motion: reduce) {
*,
*::before,
*::after {
scroll-behavior: auto !important;
transition-duration: 0.01ms !important;
animation-duration: 0.01ms !important;
animation-iteration-count: 1 !important;
}
}