From 79dccde9631c84b85ed4efc1f1d8d3585616f007 Mon Sep 17 00:00:00 2001 From: mamamiyear Date: Wed, 13 May 2026 21:26:46 +0800 Subject: [PATCH] fix: game service api block when a game is running --- README.md | 18 +++++- tests/test_agents.py | 60 ++++++++++++++++++ tests/test_service.py | 75 +++++++++++++++++++++- texas_holdem/agents.py | 137 +++++++++++++++++++++++++++++++--------- texas_holdem/engine.py | 81 ++++++++++++++++++------ texas_holdem/server.py | 16 ++--- texas_holdem/service.py | 64 +++++++++++++++---- 7 files changed, 378 insertions(+), 73 deletions(-) create mode 100644 tests/test_agents.py diff --git a/README.md b/README.md index 210fac5..5675072 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ - 支持盲注、四条街下注、弃牌、过牌、跟注、下注、加注、全下、边池和摊牌结算。 - 支持本地 Agent 和 HTTP Agent。 - 支持 Human Agent 和 OpenAI-compatible AI Agent 的终端过程输出。 +- 游戏运行中可以并发查询状态;查询返回上一手完成后的稳定快照。 ## 运行服务 @@ -51,6 +52,12 @@ curl -X POST http://127.0.0.1:8000/games/demo/hands/run \ curl http://127.0.0.1:8000/games/demo ``` +也可以使用单数别名: + +```bash +curl http://127.0.0.1:8000/game/demo +``` + ## HTTP Agent 协议 玩家配置可以使用远程 HTTP Agent: @@ -62,12 +69,19 @@ curl http://127.0.0.1:8000/games/demo "agent": { "type": "http", "endpoint": "http://127.0.0.1:9101", - "timeout_seconds": 10 + "timeout_seconds": 10, + "game_update_timeout_seconds": 3, + "retries": 2, + "retry_backoff_seconds": 0.25 } } ``` -服务会向 `endpoint` 发送当前行动玩家的观察 JSON。Agent 返回: +服务会向 `endpoint + /game` 推送每手开始时的游戏快照,向 `endpoint + /act` 发送当前行动玩家的观察 JSON。`endpoint` 也可以传入历史形式的 `/act` 或 `/game` 后缀,服务会归一化为 base URL。 + +同一个 HTTP Agent endpoint 不能同时被不同游戏占用;后创建的游戏会返回错误。服务会给 HTTP Agent 请求自动重试,`/act` 重试仍失败时,规则引擎会按 `check > call > fold` 选择默认动作,避免整桌卡死。 + +Agent 返回: ```json {"action": "call"} diff --git a/tests/test_agents.py b/tests/test_agents.py new file mode 100644 index 0000000..e3c6381 --- /dev/null +++ b/tests/test_agents.py @@ -0,0 +1,60 @@ +import json +import unittest +from unittest.mock import patch +from urllib.error import URLError + +from texas_holdem.agents import HttpAgent, normalise_http_agent_endpoint + + +class FakeResponse: + def __init__(self, payload: dict[str, object]) -> None: + self.payload = payload + + def read(self) -> bytes: + return json.dumps(self.payload).encode("utf-8") + + def __enter__(self) -> "FakeResponse": + return self + + def __exit__(self, *args: object) -> None: + return None + + +class AgentTests(unittest.TestCase): + def test_normalise_http_agent_endpoint_accepts_action_or_game_paths(self) -> None: + self.assertEqual( + normalise_http_agent_endpoint("http://127.0.0.1:9101/act"), + "http://127.0.0.1:9101", + ) + self.assertEqual( + normalise_http_agent_endpoint("http://127.0.0.1:9101/game/"), + "http://127.0.0.1:9101", + ) + + def test_http_agent_post_retries_and_sets_player_header(self) -> None: + calls = [] + + def fake_urlopen(request, timeout): # type: ignore[no-untyped-def] + calls.append((request, timeout)) + if len(calls) == 1: + raise URLError("temporary") + return FakeResponse({"ok": True}) + + agent = HttpAgent( + "http://agent.test/act", + player_id="p1", + retries=1, + retry_backoff_seconds=0, + ) + + with patch("texas_holdem.agents.urlopen", fake_urlopen): + payload = agent._post_json("/game", {"game_id": "g1"}, timeout_seconds=2) + + self.assertEqual(payload, {"ok": True}) + self.assertEqual(len(calls), 2) + self.assertEqual(calls[1][0].headers["X-player-id"], "p1") + self.assertEqual(calls[1][1], 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_service.py b/tests/test_service.py index c9b83ba..1b5f60a 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1,8 +1,26 @@ import unittest +from threading import Event, Thread +from texas_holdem.agents import PokerAgent +from texas_holdem.models import Observation, PlayerAction from texas_holdem.service import GameManager +class BlockingAgent(PokerAgent): + def __init__(self, entered: Event, release: Event) -> None: + self.entered = entered + self.release = release + + def decide(self, observation: Observation) -> PlayerAction: + self.entered.set() + if not self.release.wait(timeout=5): + raise RuntimeError("test timed out waiting to release blocking agent") + for action in observation.legal_actions: + if action["action"] == "check": + return PlayerAction("check") + return PlayerAction("call") + + class ServiceTests(unittest.TestCase): def test_create_and_run_game(self) -> None: manager = GameManager() @@ -23,7 +41,62 @@ class ServiceTests(unittest.TestCase): hands = manager.run_hands(game.game_id, count=1) self.assertEqual(len(hands), 1) - self.assertEqual(manager.get_game("demo").to_dict()["hand_number"], 1) + self.assertEqual(manager.get_game_state("demo")["hand_number"], 1) + + def test_get_game_state_does_not_block_during_run(self) -> None: + manager = GameManager() + entered = Event() + release = Event() + game = manager.create_game( + { + "game_id": "blocking", + "seed": 13, + "starting_stack": 200, + "small_blind": 5, + "big_blind": 10, + "players": [ + {"id": "a", "type": "calling"}, + {"id": "b", "type": "calling"}, + ], + } + ) + manager.run_hands("blocking", count=1) + game.agents["a"] = BlockingAgent(entered, release) + + thread = Thread(target=lambda: manager.run_hands("blocking", count=1)) + thread.start() + self.assertTrue(entered.wait(timeout=2)) + + state = manager.get_game_state("blocking") + + release.set() + thread.join(timeout=2) + self.assertFalse(thread.is_alive()) + self.assertEqual(state["hand_number"], 1) + self.assertEqual(len(state["hands"]), 1) + + def test_duplicate_http_agent_endpoint_is_rejected_across_active_games(self) -> None: + manager = GameManager() + payload = { + "starting_stack": 200, + "small_blind": 5, + "big_blind": 10, + "players": [ + { + "id": "ai", + "agent": { + "type": "http", + "endpoint": "http://127.0.0.1:9101/act", + }, + }, + {"id": "b", "type": "calling"}, + ], + } + + manager.create_game({"game_id": "g1", **payload}) + + with self.assertRaisesRegex(ValueError, "already belongs to game g1"): + manager.create_game({"game_id": "g2", **payload}) if __name__ == "__main__": diff --git a/texas_holdem/agents.py b/texas_holdem/agents.py index e8592d1..55e70fa 100644 --- a/texas_holdem/agents.py +++ b/texas_holdem/agents.py @@ -2,10 +2,11 @@ from __future__ import annotations import json import sys +import time from abc import ABC, abstractmethod from random import Random from typing import IO, Any -from urllib.error import URLError +from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from texas_holdem.human_io import clear_screen, prompt_action, render_observation @@ -54,6 +55,27 @@ class CallingStationAgent(PokerAgent): return PlayerAction("fold") +def normalise_http_agent_endpoint(raw: str) -> str: + """Return the canonical base URL for an HTTP agent endpoint.""" + url = raw.rstrip("/") + if url.endswith("/act"): + url = url[: -len("/act")] + if url.endswith("/game"): + url = url[: -len("/game")] + return url + + +def http_agent_endpoint_from_spec(spec: dict[str, Any]) -> str | None: + """Extract the canonical HTTP endpoint from an agent spec, if present.""" + agent_type = str(spec.get("type", "calling")).lower() + if agent_type != "http": + return None + endpoint = spec.get("endpoint") + if not endpoint: + raise ValueError("http agent requires an endpoint") + return normalise_http_agent_endpoint(str(endpoint)) + + class HttpAgent(PokerAgent): """Remote agent that talks to a base URL exposing ``/act`` and ``/game``. @@ -66,28 +88,36 @@ class HttpAgent(PokerAgent): ACT_PATH = "/act" GAME_PATH = "/game" - def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None: - self.base_url = self._normalise_base_url(endpoint) + def __init__( + self, + endpoint: str, + timeout_seconds: float = 10.0, + player_id: str | None = None, + game_update_timeout_seconds: float | None = None, + retries: int = 2, + retry_backoff_seconds: float = 0.25, + ) -> None: + self.base_url = normalise_http_agent_endpoint(endpoint) self.timeout_seconds = timeout_seconds - - @staticmethod - def _normalise_base_url(raw: str) -> str: - """Strip a trailing slash so URL joins do not produce double slashes. - - Centralising this also tolerates the legacy "endpoint already points - at /act" mistake by chopping off a redundant ``/act`` suffix. - """ - url = raw.rstrip("/") - if url.endswith("/act"): - url = url[: -len("/act")] - return url + self.player_id = player_id + self.game_update_timeout_seconds = ( + float(game_update_timeout_seconds) + if game_update_timeout_seconds is not None + else min(timeout_seconds, 3.0) + ) + self.retries = max(0, retries) + self.retry_backoff_seconds = max(0.0, retry_backoff_seconds) def _url(self, path: str) -> str: """Compose a full URL by joining the base with a path component.""" return f"{self.base_url}{path}" def decide(self, observation: Observation) -> PlayerAction: - payload = self._post_json(self.ACT_PATH, observation.to_dict()) + payload = self._post_json( + self.ACT_PATH, + observation.to_dict(), + timeout_seconds=self.timeout_seconds, + ) if not isinstance(payload, dict): raise RuntimeError("agent endpoint must return a JSON object") return PlayerAction.from_dict(payload) @@ -100,30 +130,54 @@ class HttpAgent(PokerAgent): only by way of the raised exception bubbling to the engine guard. """ try: - self._post_json(self.GAME_PATH, game_state) + self._post_json( + self.GAME_PATH, + game_state, + timeout_seconds=self.game_update_timeout_seconds, + ) except RuntimeError: # ``/game`` is informational; treat any HTTP error as a benign # drop rather than reraising and aborting the hand loop. return None - def _post_json(self, path: str, payload: dict[str, Any]) -> Any: + def _post_json( + self, + path: str, + payload: dict[str, Any], + timeout_seconds: float, + ) -> Any: """POST ``payload`` as JSON to ``base_url + path`` and return parsed body. Extracted as a tiny helper so ``decide`` and ``on_game_update`` share identical transport semantics (timeout, error wrapping, content-type). """ body = json.dumps(payload).encode("utf-8") - request = Request( - self._url(path), - data=body, - headers={"Content-Type": "application/json"}, - method="POST", - ) - try: - with urlopen(request, timeout=self.timeout_seconds) as response: - raw = response.read().decode("utf-8") - except (OSError, URLError) as exc: - raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from exc + last_error: BaseException | None = None + raw = "" + for attempt in range(self.retries + 1): + request = Request( + self._url(path), + data=body, + headers=self._headers(), + method="POST", + ) + try: + with urlopen(request, timeout=timeout_seconds) as response: + raw = response.read().decode("utf-8") + break + except HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + last_error = RuntimeError( + f"agent endpoint failed with HTTP {exc.code}: " + f"{self._url(path)} {detail}" + ) + except (OSError, URLError) as exc: + last_error = exc + if attempt < self.retries and self.retry_backoff_seconds > 0: + time.sleep(self.retry_backoff_seconds * (2**attempt)) + else: + raise RuntimeError(f"agent endpoint failed: {self._url(path)}") from last_error + if not raw: return None try: @@ -133,6 +187,12 @@ class HttpAgent(PokerAgent): f"agent endpoint returned invalid JSON: {self._url(path)}" ) from exc + def _headers(self) -> dict[str, str]: + headers = {"Content-Type": "application/json", "Connection": "close"} + if self.player_id: + headers["X-Player-Id"] = self.player_id + return headers + class HumanAgent(PokerAgent): """Interactive CLI agent for debugging and manual play. @@ -189,7 +249,11 @@ class HumanAgent(PokerAgent): return line.rstrip("\n") -def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent: +def build_agent( + spec: dict[str, Any], + rng: Random | None = None, + player_id: str | None = None, +) -> PokerAgent: agent_type = str(spec.get("type", "calling")).lower() if agent_type == "random": return RandomAgent(rng) @@ -199,7 +263,18 @@ def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent: endpoint = spec.get("endpoint") if not endpoint: raise ValueError("http agent requires an endpoint") - return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0))) + return HttpAgent( + str(endpoint), + timeout_seconds=float(spec.get("timeout_seconds", 10.0)), + player_id=player_id, + game_update_timeout_seconds=( + float(spec["game_update_timeout_seconds"]) + if "game_update_timeout_seconds" in spec + else None + ), + retries=int(spec.get("retries", 2)), + retry_backoff_seconds=float(spec.get("retry_backoff_seconds", 0.25)), + ) if agent_type in {"human", "cli", "interactive"}: return HumanAgent() raise ValueError(f"unknown agent type: {agent_type}") diff --git a/texas_holdem/engine.py b/texas_holdem/engine.py index 560c661..ba4a16a 100644 --- a/texas_holdem/engine.py +++ b/texas_holdem/engine.py @@ -1,6 +1,8 @@ from __future__ import annotations +from copy import deepcopy from random import Random +from threading import RLock from time import time from texas_holdem.agents import PokerAgent @@ -54,6 +56,7 @@ class TableGame: self.small_blind = small_blind self.big_blind = big_blind self.rng = rng or Random() + self.lock = RLock() self.hand_number = 0 self.button_index: int | None = None self.board = [] @@ -64,6 +67,7 @@ class TableGame: # first hand that played under those stakes, which makes it trivial # to reconstruct the schedule from the outside. self.blind_history: list[BlindLevel] = [] + self._completed_snapshot: dict[str, object] = self._to_dict_unlocked() @property def is_complete(self) -> bool: @@ -83,6 +87,14 @@ class TableGame: :attr:`blind_history` whenever it changes (including the very first hand) so external observers can replay the schedule. """ + with self.lock: + return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind) + + def _run_hand_locked( + self, + small_blind: int | None = None, + big_blind: int | None = None, + ) -> HandSummary: if self.is_complete: raise GameComplete("game is complete") @@ -151,6 +163,7 @@ class TableGame: finished_at=time(), ) self.hand_summaries.append(summary) + self._completed_snapshot = deepcopy(self._to_dict_unlocked()) return summary def run_hands( @@ -166,24 +179,47 @@ class TableGame: with the first hand of this call; subsequent calls can raise them again. Leaving them ``None`` keeps the current level unchanged. """ - if max_hands <= 0: - raise ValueError("max_hands must be positive") - summaries = [] - for _ in range(max_hands): - if self.is_complete: - break - # Only the first hand of the batch needs to apply the blind - # override; after that the engine reuses the stored values. - summaries.append( - self.run_hand(small_blind=small_blind, big_blind=big_blind) - ) - small_blind = None - big_blind = None - if until_one_left and self.is_complete: - break - return summaries + with self.lock: + if max_hands <= 0: + raise ValueError("max_hands must be positive") + summaries = [] + for _ in range(max_hands): + if self.is_complete: + break + # Only the first hand of the batch needs to apply the blind + # override; after that the engine reuses the stored values. + summaries.append( + self._run_hand_locked( + small_blind=small_blind, + big_blind=big_blind, + ) + ) + small_blind = None + big_blind = None + if until_one_left and self.is_complete: + break + return summaries def to_dict(self) -> dict[str, object]: + with self.lock: + return self._to_dict_unlocked() + + def snapshot_completed(self) -> dict[str, object]: + """Return a stable snapshot from the latest completed hand boundary. + + If a hand is currently running under ``self.lock``, this method does + not block. It returns the most recent completed hand summary and + stacks captured in memory, which is exactly what status endpoints + need while a long-running HTTP-agent decision is in progress. + """ + if self.lock.acquire(blocking=False): + try: + return deepcopy(self._to_dict_unlocked()) + finally: + self.lock.release() + return deepcopy(self._completed_snapshot) + + def _to_dict_unlocked(self) -> dict[str, object]: return { "game_id": self.game_id, "status": "complete" if self.is_complete else "running", @@ -449,9 +485,18 @@ class TableGame: try: requested = agent.decide(observation) except Exception: - requested = PlayerAction("fold") + requested = self._default_action(observation.legal_actions) return self._coerce_action(requested, observation.legal_actions) + def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction: + by_action = {str(action["action"]): action for action in legal_actions} + for action_type in ("check", "call", "fold"): + if action_type in by_action: + legal = by_action[action_type] + return PlayerAction(action_type, int(legal.get("amount") or 0)) + legal = legal_actions[0] + return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0)) + def _coerce_action( self, requested: PlayerAction, @@ -581,7 +626,7 @@ class TableGame: swallow individual exceptions so a flaky remote endpoint cannot break the table flow. """ - snapshot = self.to_dict() + snapshot = self._to_dict_unlocked() for agent in self.agents.values(): try: agent.on_game_update(snapshot) diff --git a/texas_holdem/server.py b/texas_holdem/server.py index d6f1544..2e8c405 100644 --- a/texas_holdem/server.py +++ b/texas_holdem/server.py @@ -25,8 +25,8 @@ class PokerRequestHandler(BaseHTTPRequestHandler): if path == ["games"]: self._json({"games": MANAGER.list_games()}) return - if len(path) == 2 and path[0] == "games": - self._json(MANAGER.get_game(path[1]).to_dict()) + if len(path) == 2 and path[0] in {"game", "games"}: + self._json(MANAGER.get_game_state(path[1])) return self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) except KeyError as exc: @@ -35,11 +35,11 @@ class PokerRequestHandler(BaseHTTPRequestHandler): def do_POST(self) -> None: path = self._path_parts() try: - if path == ["games"]: + if path in (["game"], ["games"]): game = MANAGER.create_game(self._read_json()) - self._json(game.to_dict(), HTTPStatus.CREATED) + self._json(game.snapshot_completed(), HTTPStatus.CREATED) return - if len(path) == 3 and path[0] == "games" and path[2] == "hands": + if len(path) == 3 and path[0] in {"game", "games"} and path[2] == "hands": body = self._read_json() count = int(body.get("count", 1)) until_one_left = bool(body.get("until_one_left", False)) @@ -51,9 +51,9 @@ class PokerRequestHandler(BaseHTTPRequestHandler): small_blind=small_blind, big_blind=big_blind, ) - self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()}) + self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])}) return - if len(path) == 4 and path[0] == "games" and path[2] == "hands" and path[3] == "run": + if len(path) == 4 and path[0] in {"game", "games"} and path[2] == "hands" and path[3] == "run": body = self._read_json() count = int(body.get("count", 1)) until_one_left = bool(body.get("until_one_left", False)) @@ -65,7 +65,7 @@ class PokerRequestHandler(BaseHTTPRequestHandler): small_blind=small_blind, big_blind=big_blind, ) - self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()}) + self._json({"hands": summaries, "game": MANAGER.get_game_state(path[1])}) return self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) except KeyError as exc: diff --git a/texas_holdem/service.py b/texas_holdem/service.py index 4623161..e8c2acd 100644 --- a/texas_holdem/service.py +++ b/texas_holdem/service.py @@ -5,13 +5,14 @@ from threading import RLock from typing import Any from uuid import uuid4 -from texas_holdem.agents import build_agent +from texas_holdem.agents import build_agent, http_agent_endpoint_from_spec from texas_holdem.engine import TableGame class GameManager: def __init__(self) -> None: self._games: dict[str, TableGame] = {} + self._http_endpoint_owners: dict[str, str] = {} self._lock = RLock() def create_game(self, payload: dict[str, Any]) -> TableGame: @@ -29,12 +30,19 @@ class GameManager: big_blind = int(payload.get("big_blind", 10)) specs = [] + http_endpoints: set[str] = set() for seat, raw_spec in enumerate(players): if not isinstance(raw_spec, dict): raise ValueError("each player must be an object") player_id = str(raw_spec.get("id") or raw_spec.get("player_id") or f"p{seat + 1}") name = str(raw_spec.get("name") or player_id) - agent = build_agent(raw_spec.get("agent", raw_spec), rng) + agent_spec = raw_spec.get("agent", raw_spec) + if not isinstance(agent_spec, dict): + raise ValueError("agent spec must be an object") + endpoint = http_agent_endpoint_from_spec(agent_spec) + if endpoint is not None: + http_endpoints.add(endpoint) + agent = build_agent(agent_spec, rng, player_id=player_id) specs.append((player_id, name, agent)) game = TableGame( @@ -46,9 +54,18 @@ class GameManager: rng=rng, ) with self._lock: + self._release_completed_http_endpoints_locked() if game_id in self._games: raise ValueError(f"game already exists: {game_id}") + for endpoint in http_endpoints: + owner = self._http_endpoint_owners.get(endpoint) + if owner is not None and owner != game_id: + raise ValueError( + f"http agent endpoint already belongs to game {owner}: {endpoint}" + ) self._games[game_id] = game + for endpoint in http_endpoints: + self._http_endpoint_owners[endpoint] = game_id return game def get_game(self, game_id: str) -> TableGame: @@ -58,9 +75,13 @@ class GameManager: except KeyError as exc: raise KeyError(f"game not found: {game_id}") from exc + def get_game_state(self, game_id: str) -> dict[str, object]: + return self.get_game(game_id).snapshot_completed() + def list_games(self) -> list[dict[str, object]]: with self._lock: - return [game.to_dict() for game in self._games.values()] + games = list(self._games.values()) + return [game.snapshot_completed() for game in games] def run_hands( self, @@ -78,13 +99,30 @@ class GameManager: no-argument behaviour. """ game = self.get_game(game_id) - with self._lock: - return [ - summary.to_dict() - for summary in game.run_hands( - count, - until_one_left=until_one_left, - small_blind=small_blind, - big_blind=big_blind, - ) - ] + summaries = [ + summary.to_dict() + for summary in game.run_hands( + count, + until_one_left=until_one_left, + small_blind=small_blind, + big_blind=big_blind, + ) + ] + if game.is_complete: + with self._lock: + self._release_http_endpoints_for_game_locked(game_id) + return summaries + + def _release_completed_http_endpoints_locked(self) -> None: + for game_id, game in list(self._games.items()): + if game.lock.acquire(blocking=False): + try: + if game.is_complete: + self._release_http_endpoints_for_game_locked(game_id) + finally: + game.lock.release() + + def _release_http_endpoints_for_game_locked(self, game_id: str) -> None: + for endpoint, owner in list(self._http_endpoint_owners.items()): + if owner == game_id: + del self._http_endpoint_owners[endpoint]