From e46b2b84c5f7aa60291b1b44eef08048c99ecdb7 Mon Sep 17 00:00:00 2001 From: mamamiyear Date: Mon, 11 May 2026 00:44:15 +0800 Subject: [PATCH] feat: basic function --- .gitignore | 36 +++ README.md | 90 +++++++ pyproject.toml | 14 + tests/__init__.py | 1 + tests/test_engine.py | 128 +++++++++ tests/test_evaluator.py | 33 +++ tests/test_service.py | 30 +++ texas_holdem/__init__.py | 21 ++ texas_holdem/agents.py | 130 +++++++++ texas_holdem/cards.py | 66 +++++ texas_holdem/engine.py | 510 +++++++++++++++++++++++++++++++++++ texas_holdem/evaluator.py | 91 +++++++ texas_holdem/human_client.py | 199 ++++++++++++++ texas_holdem/human_io.py | 247 +++++++++++++++++ texas_holdem/models.py | 168 ++++++++++++ texas_holdem/server.py | 111 ++++++++ texas_holdem/service.py | 71 +++++ 17 files changed, 1946 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/test_engine.py create mode 100644 tests/test_evaluator.py create mode 100644 tests/test_service.py create mode 100644 texas_holdem/__init__.py create mode 100644 texas_holdem/agents.py create mode 100644 texas_holdem/cards.py create mode 100644 texas_holdem/engine.py create mode 100644 texas_holdem/evaluator.py create mode 100644 texas_holdem/human_client.py create mode 100644 texas_holdem/human_io.py create mode 100644 texas_holdem/models.py create mode 100644 texas_holdem/server.py create mode 100644 texas_holdem/service.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c28ca4b --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +.DS_Store + +# Python bytecode and caches +__pycache__/ +*.py[cod] +*$py.class + +# Python packaging artifacts +*.egg-info/ +.eggs/ +build/ +dist/ +pip-wheel-metadata/ + +# Virtual environments +.venv/ +venv/ +env/ +ENV/ + +# Test and coverage outputs +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ + +# Type checker and linter caches +.mypy_cache/ +.ruff_cache/ +.pyre/ + +# IDE and editor files +.idea/ +.vscode/ +*.swp +*.swo diff --git a/README.md b/README.md new file mode 100644 index 0000000..da0ce20 --- /dev/null +++ b/README.md @@ -0,0 +1,90 @@ +# Texas Hold X + +一个标准库实现的多 AI Agent 德州扑克服务。核心代码不依赖第三方包,便于先验证规则和 Agent 协议,再接入 LLM、远程 Agent 或更完整的前端。 + +## 已实现能力 + +- 一盘游戏支持 2-12 个 Agent,开局筹码相同。 +- 一盘游戏可以连续运行多局 Texas Hold'em。 +- 服务按真实牌局顺序向当前行动 Agent 发送观察信息。 +- 观察信息包含玩家筹码、公共牌、当前玩家手牌、底池、历史动作、可用动作和跟注/加注边界。 +- 支持盲注、四条街下注、弃牌、过牌、跟注、下注、加注、全下、边池和摊牌结算。 +- 支持本地 Agent 和 HTTP Agent。 + +## 运行服务 + +```bash +python -m texas_holdem.server --host 127.0.0.1 --port 8000 +``` + +创建一盘 3 人游戏: + +```bash +curl -X POST http://127.0.0.1:8000/games \ + -H 'Content-Type: application/json' \ + -d '{ + "game_id": "demo", + "seed": 42, + "starting_stack": 1000, + "small_blind": 5, + "big_blind": 10, + "players": [ + {"id": "agent_1", "name": "Agent 1", "type": "calling"}, + {"id": "agent_2", "name": "Agent 2", "type": "random"}, + {"id": "agent_3", "name": "Agent 3", "type": "calling"} + ] + }' +``` + +运行 10 局: + +```bash +curl -X POST http://127.0.0.1:8000/games/demo/hands/run \ + -H 'Content-Type: application/json' \ + -d '{"count": 10, "until_one_left": false}' +``` + +查看游戏状态: + +```bash +curl http://127.0.0.1:8000/games/demo +``` + +## HTTP Agent 协议 + +玩家配置可以使用远程 HTTP Agent: + +```json +{ + "id": "llm_agent", + "name": "LLM Agent", + "agent": { + "type": "http", + "endpoint": "http://127.0.0.1:9001/act", + "timeout_seconds": 10 + } +} +``` + +服务会向 `endpoint` 发送当前行动玩家的观察 JSON。Agent 返回: + +```json +{"action": "call"} +``` + +可用动作包括: + +- `fold` +- `check` +- `call` +- `bet` +- `raise` +- `all_in` + +`bet` 和 `raise` 的 `amount` 表示当前下注轮中该玩家希望达到的总下注额,也就是观察中 `amount_mode: "street_total"` 的含义。 + +## 测试 + +```bash +python -m unittest discover -s tests -v +``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b74acae --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "texas-hold-x" +version = "0.1.0" +description = "A standard-library Texas Hold'em service for multi-agent poker games." +requires-python = ">=3.11" +dependencies = [] + +[project.scripts] +texas-holdem-server = "texas_holdem.server:main" +texas-holdem-human = "texas_holdem.human_client:main" + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["."] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..4641532 --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,128 @@ +import unittest +from random import Random + +from texas_holdem.agents import CallingStationAgent, PokerAgent +from texas_holdem.cards import Card +from texas_holdem.engine import TableGame +from texas_holdem.models import Observation, PlayerAction + + +class RecordingAgent(PokerAgent): + def __init__(self, seen: list[tuple[str, str, int]]) -> None: + self.seen = seen + + def decide(self, observation: Observation) -> PlayerAction: + self.seen.append((observation.street, observation.player_id, observation.to_call)) + for action in observation.legal_actions: + if action["action"] == "check": + return PlayerAction("check") + return PlayerAction("call") + + +class ScriptedAgent(PokerAgent): + def __init__( + self, + actions: list[PlayerAction], + seen: list[tuple[str, str, list[str]]], + ) -> None: + self.actions = actions + self.seen = seen + + def decide(self, observation: Observation) -> PlayerAction: + self.seen.append( + ( + observation.street, + observation.player_id, + [str(action["action"]) for action in observation.legal_actions], + ) + ) + if self.actions: + return self.actions.pop(0) + for action in observation.legal_actions: + if action["action"] == "check": + return PlayerAction("check") + return PlayerAction("call") + + +class EngineTests(unittest.TestCase): + def test_full_hand_preserves_total_chips(self) -> None: + players = [ + ("p1", "Player 1", CallingStationAgent()), + ("p2", "Player 2", CallingStationAgent()), + ("p3", "Player 3", CallingStationAgent()), + ("p4", "Player 4", CallingStationAgent()), + ] + game = TableGame("g1", players, starting_stack=1000, small_blind=5, big_blind=10, rng=Random(7)) + + summary = game.run_hand() + + self.assertEqual(sum(player.stack for player in game.players), 4000) + self.assertEqual(len(summary.board), 5) + self.assertGreaterEqual(len(summary.awards), 1) + + def test_preflop_observations_follow_table_order(self) -> None: + seen: list[tuple[str, str, int]] = [] + players = [ + ("p1", "Button", RecordingAgent(seen)), + ("p2", "Small Blind", RecordingAgent(seen)), + ("p3", "Big Blind", RecordingAgent(seen)), + ] + game = TableGame("g2", players, starting_stack=100, small_blind=5, big_blind=10, rng=Random(3)) + + game.run_hand() + preflop = [player_id for street, player_id, _ in seen if street == "preflop"] + + self.assertEqual(preflop[:3], ["p1", "p2", "p3"]) + + def test_side_pots_are_awarded_to_eligible_players(self) -> None: + players = [ + ("p1", "Short", CallingStationAgent()), + ("p2", "Middle", CallingStationAgent()), + ("p3", "Deep", CallingStationAgent()), + ] + game = TableGame("g3", players, starting_stack=0 + 100, small_blind=5, big_blind=10, rng=Random(1)) + board = [Card.parse(value) for value in "2h 7d 9c Js 3h".split()] + holes = { + "p1": "Ah Ac", + "p2": "Kh Kc", + "p3": "Qh Qc", + } + bets = {"p1": 50, "p2": 100, "p3": 100} + for player in game.players: + player.stack = 0 + player.in_hand = True + player.folded = False + player.hole_cards = [Card.parse(value) for value in holes[player.player_id].split()] + player.total_bet = bets[player.player_id] + game.board = board + game.button_index = 0 + + awards = game._award_pots() + + self.assertEqual([award.amount for award in awards], [150, 100]) + self.assertEqual(game.players[0].stack, 150) + self.assertEqual(game.players[1].stack, 100) + self.assertEqual(game.players[2].stack, 0) + + def test_short_all_in_does_not_reopen_raising_to_prior_actor(self) -> None: + seen: list[tuple[str, str, list[str]]] = [] + players = [ + ("p1", "Button", ScriptedAgent([PlayerAction("raise", 20)], seen)), + ("p2", "Short Blind", ScriptedAgent([PlayerAction("all_in", 25)], seen)), + ("p3", "Big Blind", ScriptedAgent([PlayerAction("call")], seen)), + ] + game = TableGame("g4", players, starting_stack=100, small_blind=5, big_blind=10, rng=Random(13)) + game.players[1].stack = 25 + + game.run_hand() + p1_second_preflop = [ + legal + for street, player_id, legal in seen + if street == "preflop" and player_id == "p1" + ][1] + + self.assertEqual(p1_second_preflop, ["fold", "call"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_evaluator.py b/tests/test_evaluator.py new file mode 100644 index 0000000..198e629 --- /dev/null +++ b/tests/test_evaluator.py @@ -0,0 +1,33 @@ +import unittest + +from texas_holdem.cards import Card +from texas_holdem.evaluator import evaluate + + +def cards(values: str) -> list[Card]: + return [Card.parse(value) for value in values.split()] + + +class EvaluatorTests(unittest.TestCase): + def test_straight_flush_beats_four_of_a_kind(self) -> None: + straight_flush = evaluate(cards("Ah Kh Qh Jh Th 2c 3d")) + quads = evaluate(cards("As Ac Ad Ah Kc 2d 3s")) + + self.assertGreater(straight_flush, quads) + self.assertEqual(straight_flush.name, "straight_flush") + + def test_wheel_straight_is_ranked_as_five_high(self) -> None: + value = evaluate(cards("Ah 2c 3d 4s 5h Kc Qd")) + + self.assertEqual(value.name, "straight") + self.assertEqual(value.ranks, (5,)) + + def test_two_pair_uses_kicker(self) -> None: + ace_kicker = evaluate(cards("Ah Ac Kd Ks Qh 2c 3d")) + jack_kicker = evaluate(cards("As Ad Kh Kc Jh 2d 3s")) + + self.assertGreater(ace_kicker, jack_kicker) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_service.py b/tests/test_service.py new file mode 100644 index 0000000..c9b83ba --- /dev/null +++ b/tests/test_service.py @@ -0,0 +1,30 @@ +import unittest + +from texas_holdem.service import GameManager + + +class ServiceTests(unittest.TestCase): + def test_create_and_run_game(self) -> None: + manager = GameManager() + game = manager.create_game( + { + "game_id": "demo", + "seed": 11, + "starting_stack": 200, + "small_blind": 5, + "big_blind": 10, + "players": [ + {"id": "a", "type": "calling"}, + {"id": "b", "type": "calling"}, + ], + } + ) + + hands = manager.run_hands(game.game_id, count=1) + + self.assertEqual(len(hands), 1) + self.assertEqual(manager.get_game("demo").to_dict()["hand_number"], 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/texas_holdem/__init__.py b/texas_holdem/__init__.py new file mode 100644 index 0000000..b787c9b --- /dev/null +++ b/texas_holdem/__init__.py @@ -0,0 +1,21 @@ +"""Texas Hold'em multi-agent game service.""" + +from texas_holdem.agents import ( + CallingStationAgent, + HttpAgent, + HumanAgent, + PokerAgent, + RandomAgent, +) +from texas_holdem.engine import TableGame +from texas_holdem.service import GameManager + +__all__ = [ + "CallingStationAgent", + "GameManager", + "HttpAgent", + "HumanAgent", + "PokerAgent", + "RandomAgent", + "TableGame", +] diff --git a/texas_holdem/agents.py b/texas_holdem/agents.py new file mode 100644 index 0000000..310be19 --- /dev/null +++ b/texas_holdem/agents.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import json +import sys +from abc import ABC, abstractmethod +from random import Random +from typing import IO, Any +from urllib.error import URLError +from urllib.request import Request, urlopen + +from texas_holdem.human_io import prompt_action, render_observation +from texas_holdem.models import Observation, PlayerAction + + +class PokerAgent(ABC): + @abstractmethod + def decide(self, observation: Observation) -> PlayerAction: + raise NotImplementedError + + +class RandomAgent(PokerAgent): + def __init__(self, rng: Random | None = None) -> None: + self._rng = rng or Random() + + def decide(self, observation: Observation) -> PlayerAction: + legal = observation.legal_actions + choice = self._rng.choice(legal) + action_type = str(choice["action"]) + if action_type in {"bet", "raise"}: + min_amount = int(choice["min_amount"]) + max_amount = int(choice["max_amount"]) + return PlayerAction(action_type, self._rng.randint(min_amount, max_amount)) + return PlayerAction(action_type, int(choice.get("amount") or 0)) + + +class CallingStationAgent(PokerAgent): + def decide(self, observation: Observation) -> PlayerAction: + for action in observation.legal_actions: + if action["action"] == "check": + return PlayerAction("check") + for action in observation.legal_actions: + if action["action"] == "call": + return PlayerAction("call", int(action.get("amount") or 0)) + return PlayerAction("fold") + + +class HttpAgent(PokerAgent): + def __init__(self, endpoint: str, timeout_seconds: float = 10.0) -> None: + self.endpoint = endpoint + self.timeout_seconds = timeout_seconds + + def decide(self, observation: Observation) -> PlayerAction: + body = json.dumps(observation.to_dict()).encode("utf-8") + request = Request( + self.endpoint, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urlopen(request, timeout=self.timeout_seconds) as response: + payload: Any = json.loads(response.read().decode("utf-8")) + except (OSError, URLError, json.JSONDecodeError) as exc: + raise RuntimeError(f"agent endpoint failed: {self.endpoint}") from exc + if not isinstance(payload, dict): + raise RuntimeError("agent endpoint must return a JSON object") + return PlayerAction.from_dict(payload) + + +class HumanAgent(PokerAgent): + """Interactive CLI agent for debugging and manual play. + + The agent renders the current observation in a human-friendly layout and + drives an interactive menu so the operator can only emit legal actions. + Streams are injected to keep the agent testable and to allow alternate + consoles in the future (e.g. piping to a debug log). + """ + + def __init__( + self, + input_stream: IO[str] | None = None, + output_stream: IO[str] | None = None, + ) -> None: + self._input = input_stream if input_stream is not None else sys.stdin + self._output = output_stream if output_stream is not None else sys.stdout + + def decide(self, observation: Observation) -> PlayerAction: + # Convert to dict-form so the rendering/prompting code path is shared + # with the standalone HTTP human client (see texas_holdem.human_io). + obs_dict = observation.to_dict() + self._write(render_observation(obs_dict)) + chosen = prompt_action( + list(obs_dict.get("legal_actions") or []), + self._read_line, + self._write, + ) + return PlayerAction.from_dict(chosen) + + def _write(self, text: str) -> None: + """Write to the configured output stream and flush eagerly.""" + self._output.write(text) + self._output.flush() + + def _read_line(self, prompt: str) -> str: + """Display a prompt and read one line from the configured input. + + We avoid builtin ``input()`` to honour the injected streams, which + also makes the agent unit-testable with StringIO. + """ + self._write(prompt) + line = self._input.readline() + if line == "": + raise EOFError("input stream closed while waiting for human action") + return line.rstrip("\n") + + +def build_agent(spec: dict[str, Any], rng: Random | None = None) -> PokerAgent: + agent_type = str(spec.get("type", "calling")).lower() + if agent_type == "random": + return RandomAgent(rng) + if agent_type in {"calling", "call", "calling_station"}: + return CallingStationAgent() + if agent_type == "http": + endpoint = spec.get("endpoint") + if not endpoint: + raise ValueError("http agent requires an endpoint") + return HttpAgent(str(endpoint), float(spec.get("timeout_seconds", 10.0))) + if agent_type in {"human", "cli", "interactive"}: + return HumanAgent() + raise ValueError(f"unknown agent type: {agent_type}") diff --git a/texas_holdem/cards.py b/texas_holdem/cards.py new file mode 100644 index 0000000..9a327f9 --- /dev/null +++ b/texas_holdem/cards.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from dataclasses import dataclass +from random import Random + +SUITS = ("c", "d", "h", "s") +RANK_LABELS = { + 2: "2", + 3: "3", + 4: "4", + 5: "5", + 6: "6", + 7: "7", + 8: "8", + 9: "9", + 10: "T", + 11: "J", + 12: "Q", + 13: "K", + 14: "A", +} +LABEL_RANKS = {label: rank for rank, label in RANK_LABELS.items()} + + +@dataclass(frozen=True, slots=True) +class Card: + rank: int + suit: str + + def __post_init__(self) -> None: + if self.rank not in RANK_LABELS: + raise ValueError(f"invalid rank: {self.rank}") + if self.suit not in SUITS: + raise ValueError(f"invalid suit: {self.suit}") + + def __str__(self) -> str: + return f"{RANK_LABELS[self.rank]}{self.suit}" + + @classmethod + def parse(cls, value: str) -> "Card": + if len(value) != 2: + raise ValueError(f"card must have two characters: {value!r}") + rank_label = value[0].upper() + suit = value[1].lower() + if rank_label not in LABEL_RANKS: + raise ValueError(f"invalid rank label: {rank_label}") + return cls(LABEL_RANKS[rank_label], suit) + + +class Deck: + def __init__(self, rng: Random | None = None) -> None: + self._rng = rng or Random() + self._cards = [Card(rank, suit) for suit in SUITS for rank in range(2, 15)] + self._rng.shuffle(self._cards) + + def draw(self, count: int = 1) -> list[Card]: + if count < 1: + raise ValueError("count must be positive") + if len(self._cards) < count: + raise ValueError("deck does not have enough cards") + drawn = self._cards[-count:] + del self._cards[-count:] + return drawn + + def burn(self) -> None: + self.draw(1) diff --git a/texas_holdem/engine.py b/texas_holdem/engine.py new file mode 100644 index 0000000..1dc91c0 --- /dev/null +++ b/texas_holdem/engine.py @@ -0,0 +1,510 @@ +from __future__ import annotations + +from random import Random +from time import time + +from texas_holdem.agents import PokerAgent +from texas_holdem.cards import Deck +from texas_holdem.evaluator import evaluate +from texas_holdem.models import ( + ActionRecord, + HandSummary, + Observation, + PlayerAction, + PlayerState, + PotAward, +) + +STREETS = ("preflop", "flop", "turn", "river") + + +class GameComplete(RuntimeError): + pass + + +class TableGame: + def __init__( + self, + game_id: str, + player_specs: list[tuple[str, str, PokerAgent]], + starting_stack: int, + small_blind: int, + big_blind: int, + rng: Random | None = None, + ) -> None: + if not 2 <= len(player_specs) <= 12: + raise ValueError("a game requires 2-12 players") + if starting_stack <= 0: + raise ValueError("starting_stack must be positive") + if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind: + raise ValueError("blinds must satisfy 0 < small_blind <= big_blind") + + player_ids = [player_id for player_id, _, _ in player_specs] + if len(set(player_ids)) != len(player_ids): + raise ValueError("player ids must be unique") + + self.game_id = game_id + self.players = [ + PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat) + for seat, (player_id, name, _) in enumerate(player_specs) + ] + self.agents = {player_id: agent for player_id, _, agent in player_specs} + self.starting_stack = starting_stack + self.small_blind = small_blind + self.big_blind = big_blind + self.rng = rng or Random() + self.hand_number = 0 + self.button_index: int | None = None + self.board = [] + self.action_history: list[ActionRecord] = [] + self.hand_summaries: list[HandSummary] = [] + + @property + def is_complete(self) -> bool: + return len([player for player in self.players if player.stack > 0]) < 2 + + def run_hand(self) -> HandSummary: + if self.is_complete: + raise GameComplete("game is complete") + + self.hand_number += 1 + started_at = time() + self.board = [] + self.action_history = [] + deck = Deck(self.rng) + + for player in self.players: + player.reset_for_hand() + + self._advance_button() + assert self.button_index is not None + + self._deal_hole_cards(deck) + small_blind_index, big_blind_index = self._blind_indexes() + self._post_blind(small_blind_index, "small_blind", self.small_blind) + self._post_blind(big_blind_index, "big_blind", self.big_blind) + + preflop_start = ( + small_blind_index + if self._active_player_count() == 2 + else self._next_index(big_blind_index + 1, self._can_act) + ) + self._betting_round("preflop", preflop_start, self.big_blind) + + for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)): + if self._contender_count() <= 1: + break + deck.burn() + self.board.extend(deck.draw(card_count)) + for player in self.players: + player.reset_for_street() + if self._betting_player_count() >= 2: + start_index = self._next_index(self.button_index + 1, self._can_act) + self._betting_round(street, start_index, self.big_blind) + + awards = self._award_pots() + summary = HandSummary( + game_id=self.game_id, + hand_number=self.hand_number, + button_seat=self.players[self.button_index].seat, + board=list(self.board), + actions=list(self.action_history), + awards=awards, + started_at=started_at, + finished_at=time(), + ) + self.hand_summaries.append(summary) + return summary + + def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]: + if max_hands <= 0: + raise ValueError("max_hands must be positive") + summaries = [] + for _ in range(max_hands): + if self.is_complete: + break + summaries.append(self.run_hand()) + if until_one_left and self.is_complete: + break + return summaries + + def to_dict(self) -> dict[str, object]: + return { + "game_id": self.game_id, + "status": "complete" if self.is_complete else "running", + "hand_number": self.hand_number, + "button_seat": None + if self.button_index is None + else self.players[self.button_index].seat, + "small_blind": self.small_blind, + "big_blind": self.big_blind, + "starting_stack": self.starting_stack, + "players": [player.public_dict() for player in self.players], + "last_hand": self.hand_summaries[-1].to_dict() if self.hand_summaries else None, + } + + def _advance_button(self) -> None: + if self.button_index is None: + self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0) + return + self.button_index = self._next_index( + self.button_index + 1, + lambda index: self.players[index].stack > 0, + ) + + def _blind_indexes(self) -> tuple[int, int]: + assert self.button_index is not None + if self._active_player_count() == 2: + small_blind_index = self.button_index + big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) + else: + small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) + big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand) + assert small_blind_index is not None + assert big_blind_index is not None + return small_blind_index, big_blind_index + + def _deal_hole_cards(self, deck: Deck) -> None: + assert self.button_index is not None + deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand) + for _ in range(2): + for index in deal_order: + self.players[index].hole_cards.extend(deck.draw()) + + def _post_blind(self, player_index: int, action: str, amount: int) -> None: + player = self.players[player_index] + committed = player.commit(amount) + self._record_action(player, "preflop", action, committed) + + def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None: + if start_index is None: + return + + current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0) + min_raise = opening_min_raise + pending = {index for index in range(len(self.players)) if self._can_act(index)} + call_only: set[int] = set() + cursor = start_index + + while pending and self._contender_count() > 1: + player_index = self._next_index(cursor, lambda index: index in pending) + if player_index is None: + break + + player = self.players[player_index] + observation = self._observation( + street, + player_index, + current_bet, + min_raise, + can_raise=player_index not in call_only, + ) + action = self._agent_action(player, observation) + previous_bet = current_bet + + current_bet, min_raise, full_raise = self._apply_action( + street, + player, + action, + current_bet, + min_raise, + ) + + pending.discard(player_index) + call_only.discard(player_index) + opened_betting = previous_bet == 0 and current_bet > 0 + if full_raise or opened_betting: + pending = { + index + for index in range(len(self.players)) + if index != player_index and self._can_act(index) + } + call_only.clear() + elif current_bet > previous_bet: + owing_players = { + index + for index in range(len(self.players)) + if index != player_index + and self._can_act(index) + and self.players[index].street_bet < current_bet + } + call_only.update(owing_players - pending) + pending.update(owing_players) + + pending = {index for index in pending if self._can_act(index)} + call_only = {index for index in call_only if index in pending} + cursor = player_index + 1 + + def _observation( + self, + street: str, + player_index: int, + current_bet: int, + min_raise: int, + can_raise: bool = True, + ) -> Observation: + player = self.players[player_index] + legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise) + min_raise_to = next( + ( + int(action["min_amount"]) + for action in legal_actions + if action["action"] in {"bet", "raise"} + ), + None, + ) + assert self.button_index is not None + return Observation( + game_id=self.game_id, + hand_number=self.hand_number, + street=street, + player_id=player.player_id, + seat=player.seat, + button_seat=self.players[self.button_index].seat, + small_blind=self.small_blind, + big_blind=self.big_blind, + board=list(self.board), + hole_cards=list(player.hole_cards), + players=[other.public_dict() for other in self.players], + pot=sum(other.total_bet for other in self.players), + to_call=max(0, current_bet - player.street_bet), + min_raise_to=min_raise_to, + legal_actions=legal_actions, + action_history=list(self.action_history), + ) + + def _legal_actions( + self, + player: PlayerState, + current_bet: int, + min_raise: int, + can_raise: bool = True, + ) -> list[dict[str, object]]: + to_call = max(0, current_bet - player.street_bet) + max_target = player.street_bet + player.stack + actions: list[dict[str, object]] = [] + + if to_call > 0: + actions.append({"action": "fold", "amount": 0}) + actions.append({"action": "call", "amount": min(to_call, player.stack)}) + if not can_raise: + return actions + min_raise_to = current_bet + min_raise + if max_target >= min_raise_to: + actions.append( + { + "action": "raise", + "min_amount": min_raise_to, + "max_amount": max_target, + "amount_mode": "street_total", + } + ) + elif max_target > current_bet: + actions.append({"action": "all_in", "amount": max_target}) + return actions + + actions.append({"action": "check", "amount": 0}) + if player.stack <= 0: + return actions + + if current_bet == 0: + if max_target >= self.big_blind: + actions.append( + { + "action": "bet", + "min_amount": self.big_blind, + "max_amount": max_target, + "amount_mode": "street_total", + } + ) + else: + actions.append({"action": "all_in", "amount": max_target}) + else: + min_raise_to = current_bet + min_raise + if max_target >= min_raise_to: + actions.append( + { + "action": "raise", + "min_amount": min_raise_to, + "max_amount": max_target, + "amount_mode": "street_total", + } + ) + elif max_target > current_bet: + actions.append({"action": "all_in", "amount": max_target}) + + return actions + + def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction: + agent = self.agents[player.player_id] + try: + requested = agent.decide(observation) + except Exception: + requested = PlayerAction("fold") + return self._coerce_action(requested, observation.legal_actions) + + def _coerce_action( + self, + requested: PlayerAction, + legal_actions: list[dict[str, object]], + ) -> PlayerAction: + by_action = {str(action["action"]): action for action in legal_actions} + requested_type = requested.action.lower() + + if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action: + legal = by_action[requested_type] + return PlayerAction(requested_type, int(legal.get("amount") or 0)) + + if requested_type in {"bet", "raise"} and requested_type in by_action: + legal = by_action[requested_type] + min_amount = int(legal["min_amount"]) + max_amount = int(legal["max_amount"]) + amount = min(max(requested.amount, min_amount), max_amount) + return PlayerAction(requested_type, amount) + + for fallback in ("check", "call", "fold"): + if fallback in by_action: + legal = by_action[fallback] + return PlayerAction(fallback, int(legal.get("amount") or 0)) + + legal = legal_actions[0] + return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0)) + + def _apply_action( + self, + street: str, + player: PlayerState, + action: PlayerAction, + current_bet: int, + min_raise: int, + ) -> tuple[int, int, bool]: + previous_bet = current_bet + committed = 0 + full_raise = False + + if action.action == "fold": + player.folded = True + elif action.action == "check": + pass + elif action.action == "call": + committed = player.commit(current_bet - player.street_bet) + elif action.action in {"bet", "raise", "all_in"}: + target = action.amount + committed = player.commit(target - player.street_bet) + current_bet = max(current_bet, player.street_bet) + raise_size = current_bet - previous_bet + if raise_size >= min_raise: + full_raise = True + min_raise = raise_size + else: + raise ValueError(f"unsupported action: {action.action}") + + self._record_action(player, street, action.action, committed) + return current_bet, min_raise, full_raise + + def _award_pots(self) -> list[PotAward]: + total_pot = sum(player.total_bet for player in self.players) + live_players = [player for player in self.players if self._is_live(player)] + if not live_players or total_pot <= 0: + return [] + + if len(live_players) == 1: + live_players[0].stack += total_pot + return [PotAward(total_pot, [live_players[0].player_id], None)] + + levels = sorted({player.total_bet for player in self.players if player.total_bet > 0}) + previous_level = 0 + awards: list[PotAward] = [] + for level in levels: + contributors = [player for player in self.players if player.total_bet >= level] + pot_amount = (level - previous_level) * len(contributors) + previous_level = level + contenders = [player for player in contributors if self._is_live(player)] + if not contenders or pot_amount <= 0: + continue + + values = { + player.player_id: evaluate([*player.hole_cards, *self.board]) + for player in contenders + } + best_value = max(values.values()) + winners = [ + player + for player in contenders + if values[player.player_id] == best_value + ] + ordered_winners = self._button_order(winners) + share, remainder = divmod(pot_amount, len(ordered_winners)) + for winner in ordered_winners: + winner.stack += share + for winner in ordered_winners[:remainder]: + winner.stack += 1 + awards.append( + PotAward( + amount=pot_amount, + winners=[winner.player_id for winner in ordered_winners], + hand_value=best_value, + ) + ) + return awards + + def _record_action( + self, + player: PlayerState, + street: str, + action: str, + committed: int, + ) -> None: + self.action_history.append( + ActionRecord( + hand_number=self.hand_number, + street=street, + player_id=player.player_id, + action=action, + amount=committed, + street_bet=player.street_bet, + stack=player.stack, + ) + ) + + def _active_player_count(self) -> int: + return len([player for player in self.players if player.stack > 0 or player.in_hand]) + + def _contender_count(self) -> int: + return len([player for player in self.players if self._is_live(player)]) + + def _betting_player_count(self) -> int: + return len([index for index in range(len(self.players)) if self._can_act(index)]) + + def _is_in_hand(self, index: int) -> bool: + return self.players[index].in_hand + + def _is_live(self, player: PlayerState) -> bool: + return player.in_hand and not player.folded + + def _can_act(self, index: int) -> bool: + player = self.players[index] + return self._is_live(player) and not player.all_in and player.stack > 0 + + def _next_index(self, start: int, predicate) -> int | None: + player_count = len(self.players) + for offset in range(player_count): + index = (start + offset) % player_count + if predicate(index): + return index + return None + + def _ordered_indexes(self, start: int, predicate) -> list[int]: + player_count = len(self.players) + indexes = [] + for offset in range(player_count): + index = (start + offset) % player_count + if predicate(index): + indexes.append(index) + return indexes + + def _button_order(self, players: list[PlayerState]) -> list[PlayerState]: + assert self.button_index is not None + order = self._ordered_indexes(self.button_index + 1, lambda _: True) + seat_rank = {index: rank for rank, index in enumerate(order)} + return sorted(players, key=lambda player: seat_rank[player.seat]) diff --git a/texas_holdem/evaluator.py b/texas_holdem/evaluator.py new file mode 100644 index 0000000..cddc19a --- /dev/null +++ b/texas_holdem/evaluator.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from collections import Counter +from dataclasses import dataclass +from itertools import combinations + +from texas_holdem.cards import Card + +CATEGORY_NAMES = { + 8: "straight_flush", + 7: "four_of_a_kind", + 6: "full_house", + 5: "flush", + 4: "straight", + 3: "three_of_a_kind", + 2: "two_pair", + 1: "pair", + 0: "high_card", +} + + +@dataclass(frozen=True, order=True, slots=True) +class HandValue: + category: int + ranks: tuple[int, ...] + + @property + def name(self) -> str: + return CATEGORY_NAMES[self.category] + + def to_dict(self) -> dict[str, object]: + return {"category": self.category, "name": self.name, "ranks": list(self.ranks)} + + +def evaluate(cards: list[Card]) -> HandValue: + if len(cards) < 5: + raise ValueError("at least five cards are required") + return max(_evaluate_five(list(combo)) for combo in combinations(cards, 5)) + + +def _evaluate_five(cards: list[Card]) -> HandValue: + ranks = sorted((card.rank for card in cards), reverse=True) + counts = Counter(ranks) + groups = sorted(counts.items(), key=lambda item: (item[1], item[0]), reverse=True) + is_flush = len({card.suit for card in cards}) == 1 + straight_high = _straight_high(ranks) + + if is_flush and straight_high is not None: + return HandValue(8, (straight_high,)) + + if groups[0][1] == 4: + quad_rank = groups[0][0] + kicker = max(rank for rank in ranks if rank != quad_rank) + return HandValue(7, (quad_rank, kicker)) + + if groups[0][1] == 3 and groups[1][1] == 2: + return HandValue(6, (groups[0][0], groups[1][0])) + + if is_flush: + return HandValue(5, tuple(ranks)) + + if straight_high is not None: + return HandValue(4, (straight_high,)) + + if groups[0][1] == 3: + trip_rank = groups[0][0] + kickers = sorted((rank for rank in ranks if rank != trip_rank), reverse=True) + return HandValue(3, (trip_rank, *kickers)) + + if groups[0][1] == 2 and groups[1][1] == 2: + pair_ranks = sorted((rank for rank, count in counts.items() if count == 2), reverse=True) + kicker = max(rank for rank in ranks if rank not in pair_ranks) + return HandValue(2, (*pair_ranks, kicker)) + + if groups[0][1] == 2: + pair_rank = groups[0][0] + kickers = sorted((rank for rank in ranks if rank != pair_rank), reverse=True) + return HandValue(1, (pair_rank, *kickers)) + + return HandValue(0, tuple(ranks)) + + +def _straight_high(ranks: list[int]) -> int | None: + unique = sorted(set(ranks), reverse=True) + if {14, 5, 4, 3, 2}.issubset(unique): + unique.append(1) + for index in range(0, len(unique) - 4): + window = unique[index : index + 5] + if window[0] - window[4] == 4 and len(set(window)) == 5: + return 5 if window == [5, 4, 3, 2, 1] else window[0] + return None diff --git a/texas_holdem/human_client.py b/texas_holdem/human_client.py new file mode 100644 index 0000000..ef05d77 --- /dev/null +++ b/texas_holdem/human_client.py @@ -0,0 +1,199 @@ +"""Standalone interactive HTTP Human Agent. + +Run this as a process on the operator's machine to expose a single +``POST /act`` endpoint that the Texas Hold'em service can call when it is +that operator's turn to act: + + python -m texas_holdem.human_client --host 127.0.0.1 --port 9001 + +Then create a game on the server with this player spec:: + + { + "id": "alice", + "name": "Alice", + "agent": { + "type": "http", + "endpoint": "http://127.0.0.1:9001/act", + "timeout_seconds": 600 + } + } + +Every time the server posts an observation, this client renders it on the +local terminal and blocks on stdin until the human chooses a legal action, +then returns ``{"action": "...", "amount": N}`` as JSON. + +Design notes: +- The HTTP layer reuses :mod:`texas_holdem.human_io` so rendering and menu + validation stay consistent with the in-process :class:`HumanAgent`. +- A module-level :class:`threading.Lock` serialises terminal access. This is + necessary because the (rare) case of multiple overlapping requests from + the server must not interleave prompts on the same TTY. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from threading import Lock +from typing import IO, Any + +from texas_holdem.human_io import prompt_action, render_observation + + +class HumanClientConsole: + """Encapsulates terminal IO with a lock to serialise prompts. + + Wrapping the streams in a tiny class keeps stream injection (handy for + tests) and concurrency control in one place, instead of leaking through + free functions. + """ + + def __init__( + self, + input_stream: IO[str] | None = None, + output_stream: IO[str] | None = None, + ) -> None: + self._input = input_stream if input_stream is not None else sys.stdin + self._output = output_stream if output_stream is not None else sys.stdout + # The lock guards both the printed observation block and the prompt + # loop so two concurrent /act calls would never interleave on the + # same TTY. + self._lock = Lock() + + def decide(self, observation: dict[str, Any]) -> dict[str, Any]: + """Render an observation and return the operator's action dict.""" + with self._lock: + self._write(render_observation(observation)) + return prompt_action( + list(observation.get("legal_actions") or []), + self._read_line, + self._write, + ) + + def _write(self, text: str) -> None: + self._output.write(text) + self._output.flush() + + def _read_line(self, prompt: str) -> str: + self._write(prompt) + line = self._input.readline() + if line == "": + raise EOFError("input stream closed while waiting for human action") + return line.rstrip("\n") + + +class HumanRequestHandler(BaseHTTPRequestHandler): + """HTTP entry point for the standalone human agent. + + Only ``POST /act`` is meaningful; ``GET /health`` is provided so deploys + can quickly probe whether the client is alive before hooking it up. + """ + + server_version = "TexasHoldemHumanClient/0.1" + + # Injected by :func:`create_server` on the underlying server instance so + # every handler shares the same terminal console. + console: HumanClientConsole # type: ignore[assignment] + + def do_GET(self) -> None: + if self.path == "/health": + self._json({"ok": True}) + return + self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) + + def do_POST(self) -> None: + if self.path != "/act": + self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) + return + + try: + payload = self._read_json() + except ValueError as exc: + self._json({"error": str(exc)}, HTTPStatus.BAD_REQUEST) + return + + try: + action = self.console.decide(payload) + except EOFError as exc: + # The operator closed stdin (Ctrl-D); surface as 503 so the + # server can fall back to its default coercion (fold). + self._json({"error": str(exc)}, HTTPStatus.SERVICE_UNAVAILABLE) + return + except Exception as exc: # pragma: no cover - defensive guard + self._json({"error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR) + return + + self._json(action) + + # Silence the default access log so it does not interleave with prompts. + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 + return + + def _read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length", "0")) + if length <= 0: + raise ValueError("request body is required") + try: + payload = json.loads(self.rfile.read(length).decode("utf-8")) + except json.JSONDecodeError as exc: + raise ValueError("request body must be valid JSON") from exc + if not isinstance(payload, dict): + raise ValueError("request body must be a JSON object") + return payload + + def _json( + self, + payload: dict[str, Any], + status: HTTPStatus = HTTPStatus.OK, + ) -> None: + body = json.dumps(payload, ensure_ascii=True).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +def create_server( + host: str, + port: int, + console: HumanClientConsole | None = None, +) -> ThreadingHTTPServer: + """Build a server with a shared :class:`HumanClientConsole`. + + Exposed as a function so tests (or callers wiring custom IO streams) + can construct the server without touching ``main``. + """ + server = ThreadingHTTPServer((host, port), HumanRequestHandler) + HumanRequestHandler.console = console or HumanClientConsole() + return server + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run an interactive HTTP Human Agent that exposes POST /act.", + ) + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", default=9001, type=int) + args = parser.parse_args() + + server = create_server(args.host, args.port) + print( + f"Human HTTP agent listening on http://{args.host}:{args.port}/act\n" + "Use this URL as the 'endpoint' field of a 'http' agent spec.", + file=sys.stderr, + flush=True, + ) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/texas_holdem/human_io.py b/texas_holdem/human_io.py new file mode 100644 index 0000000..2aca282 --- /dev/null +++ b/texas_holdem/human_io.py @@ -0,0 +1,247 @@ +"""Pure helpers for rendering observations and prompting human actions. + +This module is intentionally I/O-injected and dict-based so that both the +in-process :class:`HumanAgent` and the standalone HTTP human client can share +the same presentation and validation logic. + +Design rationale: +- Functions accept the *dict* form of an observation (the same payload that + :meth:`Observation.to_dict` produces and that flows over HTTP). That keeps + the helpers agnostic to whether the caller has a real ``Observation`` + object or a freshly parsed JSON document. +- Reader/Writer callables are passed in (rather than reading ``stdin`` / + writing ``stdout`` directly) so the helpers stay testable and reusable in + any context (CLI, sockets, GUI, etc.). +""" + +from __future__ import annotations + +from typing import Any, Callable + +# Type aliases keep the public function signatures self-documenting and make +# it trivial to swap in alternative IO backends (e.g. async streams). +Reader = Callable[[str], str] +Writer = Callable[[str], None] + +# Mapping from internal one-letter suit codes to Unicode pip glyphs. Defined +# at module scope so it is cheap to look up and easy to override in tests. +SUIT_GLYPHS: dict[str, str] = { + "s": "\u2660", # ♠ spades + "h": "\u2665", # ♥ hearts + "c": "\u2663", # ♣ clubs + "d": "\u2666", # ♦ diamonds +} + + +def pretty_card(label: str) -> str: + """Render a two-character card label like ``"8h"`` as ``"♥8"``. + + Designed as a small, total function so it can be reused anywhere card + strings need to be displayed (terminal, future GUI, logs). Unknown + suits fall through unchanged so we never crash on malformed data. + """ + if not isinstance(label, str) or len(label) < 2: + return str(label) + rank, suit = label[0], label[1].lower() + glyph = SUIT_GLYPHS.get(suit) + if glyph is None: + return label + return f"{glyph}{rank}" + + +def _format_cards(cards: list[Any], empty_text: str) -> str: + """Render a list of card labels using :func:`pretty_card` with separators. + + Extracted as a tiny helper so both the board and hole-cards lines share + one definition of "what an empty hand looks like". + """ + if not cards: + return empty_text + return " ".join(pretty_card(str(card)) for card in cards) + + +def render_observation(observation: dict[str, Any]) -> str: + """Render an observation dict as a multi-line, human-readable block. + + Returning a single string (instead of writing directly) lets the caller + decide where the output should go, and keeps the function pure and easy + to unit-test. + """ + lines: list[str] = [] + lines.append("=" * 60) + lines.append( + f"Game {observation.get('game_id')} | Hand #{observation.get('hand_number')} " + f"| Street: {observation.get('street')}" + ) + lines.append( + f"Blinds {observation.get('small_blind')}/{observation.get('big_blind')} " + f"| Button seat: {observation.get('button_seat')} " + f"| Pot: {observation.get('pot')}" + ) + + board = observation.get("board") or [] + hole_cards = observation.get("hole_cards") or [] + lines.append(f"Board : {_format_cards(board, '(empty)')}") + lines.append( + f"Your hand : {_format_cards(hole_cards, '(none)')}" + f" (seat {observation.get('seat')}, id={observation.get('player_id')})" + ) + + lines.append("-" * 60) + lines.append("Players:") + current_id = observation.get("player_id") + for player in observation.get("players", []): + marker = _player_marker(player, current_id) + lines.append( + f" {marker} seat {int(player.get('seat', 0)):>2} " + f"| {str(player.get('name', '')):<16} " + f"| stack {int(player.get('stack', 0)):>6} " + f"| street_bet {int(player.get('street_bet', 0)):>6} " + f"| total_bet {int(player.get('total_bet', 0)):>6}" + ) + + lines.append("-" * 60) + min_raise_to = observation.get("min_raise_to") + lines.append( + f"To call: {observation.get('to_call')} " + f"| Min raise to: {min_raise_to if min_raise_to is not None else '-'}" + ) + + lines.append("Recent actions:") + history = observation.get("action_history") or [] + if not history: + lines.append(" (no actions yet)") + else: + for record in history[-8:]: + lines.append( + f" [{str(record.get('street', '')):<7}] " + f"{str(record.get('player_id', '')):<12} " + f"-> {str(record.get('action', '')):<6} " + f"amount={record.get('amount', 0)}" + ) + + lines.append("=" * 60) + return "\n".join(lines) + "\n" + + +def _player_marker(player: dict[str, Any], current_player_id: Any) -> str: + """Produce a single-character marker describing a player's status. + + ``*`` highlights the player who is currently to act, ``F`` flags a folded + seat, ``A`` an all-in seat. Isolated as a helper to keep the rendering + loop free of cosmetic branching. + """ + if player.get("player_id") == current_player_id: + return "*" + if player.get("folded"): + return "F" + if player.get("all_in"): + return "A" + return " " + + +def prompt_action( + legal_actions: list[dict[str, Any]], + reader: Reader, + writer: Writer, +) -> dict[str, Any]: + """Drive an interactive menu and return a chosen ``{action, amount}`` dict. + + The function loops until a valid choice is made because, for an + interactive debugger, treating typos as fatal would be hostile. The + returned dict matches the JSON schema accepted by ``PlayerAction.from_dict``. + A trailing separator + blank line is emitted right before returning so + consecutive turns are visually separated in the terminal log. + """ + if not legal_actions: + raise RuntimeError("no legal actions available") + + while True: + writer("Choose an action:\n") + for index, action in enumerate(legal_actions, start=1): + writer(f" [{index}] {format_legal_action(action)}\n") + + raw = reader("Enter choice number: ").strip() + choice = _parse_choice(raw, len(legal_actions)) + if choice is None: + writer("Invalid choice, please try again.\n") + continue + + selected = legal_actions[choice - 1] + action_type = str(selected["action"]) + + if action_type in {"bet", "raise"}: + amount = _prompt_amount( + int(selected["min_amount"]), + int(selected["max_amount"]), + reader, + writer, + ) + if amount is None: + # Operator cancelled the amount entry; redisplay the menu. + continue + _emit_turn_separator(writer) + return {"action": action_type, "amount": amount} + + _emit_turn_separator(writer) + return {"action": action_type, "amount": int(selected.get("amount") or 0)} + + +def _emit_turn_separator(writer: Writer) -> None: + """Print a divider plus a blank line to delimit consecutive turns. + + Centralised so the exact glyph/length of the separator can be changed + in one place if the visual style ever needs tweaking. + """ + writer("=====\n\n") + + +def format_legal_action(action: dict[str, Any]) -> str: + """Render one legal-action dict as a one-line description for the menu.""" + action_type = str(action["action"]) + if action_type in {"bet", "raise"}: + return ( + f"{action_type} (street_total in " + f"[{action['min_amount']}, {action['max_amount']}])" + ) + if action_type in {"call", "all_in"}: + return f"{action_type} {action.get('amount', 0)}" + return action_type + + +def _parse_choice(raw: str, upper: int) -> int | None: + """Parse a 1-based menu index, returning ``None`` on any out-of-range input.""" + if not raw.isdigit(): + return None + value = int(raw) + if not 1 <= value <= upper: + return None + return value + + +def _prompt_amount( + min_amount: int, + max_amount: int, + reader: Reader, + writer: Writer, +) -> int | None: + """Prompt for a bet/raise street-total in ``[min_amount, max_amount]``. + + Returning ``None`` lets the caller back out of an accidental selection + (operator just presses Enter on an empty line). + """ + while True: + raw = reader( + f"Enter target street total in [{min_amount}, {max_amount}] " + f"(blank to cancel): " + ).strip() + if raw == "": + return None + if not raw.lstrip("-").isdigit(): + writer("Amount must be an integer.\n") + continue + value = int(raw) + if not min_amount <= value <= max_amount: + writer(f"Amount {value} out of range [{min_amount}, {max_amount}].\n") + continue + return value diff --git a/texas_holdem/models.py b/texas_holdem/models.py new file mode 100644 index 0000000..a51bae7 --- /dev/null +++ b/texas_holdem/models.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from time import time +from typing import Any + +from texas_holdem.cards import Card +from texas_holdem.evaluator import HandValue + + +@dataclass(slots=True) +class PlayerAction: + action: str + amount: int = 0 + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "PlayerAction": + return cls(str(payload.get("action", "")).lower(), int(payload.get("amount") or 0)) + + def to_dict(self) -> dict[str, object]: + return {"action": self.action, "amount": self.amount} + + +@dataclass(slots=True) +class PlayerState: + player_id: str + name: str + stack: int + seat: int + hole_cards: list[Card] = field(default_factory=list) + folded: bool = False + all_in: bool = False + in_hand: bool = False + street_bet: int = 0 + total_bet: int = 0 + + def reset_for_hand(self) -> None: + self.hole_cards = [] + self.folded = False + self.all_in = False + self.in_hand = self.stack > 0 + self.street_bet = 0 + self.total_bet = 0 + + def reset_for_street(self) -> None: + self.street_bet = 0 + + def commit(self, amount: int) -> int: + committed = max(0, min(amount, self.stack)) + self.stack -= committed + self.street_bet += committed + self.total_bet += committed + if self.stack == 0 and self.in_hand and not self.folded: + self.all_in = True + return committed + + def public_dict(self) -> dict[str, object]: + return { + "player_id": self.player_id, + "name": self.name, + "seat": self.seat, + "stack": self.stack, + "folded": self.folded, + "all_in": self.all_in, + "in_hand": self.in_hand, + "street_bet": self.street_bet, + "total_bet": self.total_bet, + } + + +@dataclass(slots=True) +class ActionRecord: + hand_number: int + street: str + player_id: str + action: str + amount: int + street_bet: int + stack: int + + def to_dict(self) -> dict[str, object]: + return { + "hand_number": self.hand_number, + "street": self.street, + "player_id": self.player_id, + "action": self.action, + "amount": self.amount, + "street_bet": self.street_bet, + "stack": self.stack, + } + + +@dataclass(slots=True) +class Observation: + game_id: str + hand_number: int + street: str + player_id: str + seat: int + button_seat: int + small_blind: int + big_blind: int + board: list[Card] + hole_cards: list[Card] + players: list[dict[str, object]] + pot: int + to_call: int + min_raise_to: int | None + legal_actions: list[dict[str, object]] + action_history: list[ActionRecord] + + def to_dict(self) -> dict[str, object]: + return { + "game_id": self.game_id, + "hand_number": self.hand_number, + "street": self.street, + "player_id": self.player_id, + "seat": self.seat, + "button_seat": self.button_seat, + "small_blind": self.small_blind, + "big_blind": self.big_blind, + "board": [str(card) for card in self.board], + "hole_cards": [str(card) for card in self.hole_cards], + "players": self.players, + "pot": self.pot, + "to_call": self.to_call, + "min_raise_to": self.min_raise_to, + "legal_actions": self.legal_actions, + "action_history": [record.to_dict() for record in self.action_history], + } + + +@dataclass(slots=True) +class PotAward: + amount: int + winners: list[str] + hand_value: HandValue | None + + def to_dict(self) -> dict[str, object]: + return { + "amount": self.amount, + "winners": self.winners, + "hand_value": self.hand_value.to_dict() if self.hand_value else None, + } + + +@dataclass(slots=True) +class HandSummary: + game_id: str + hand_number: int + button_seat: int + board: list[Card] + actions: list[ActionRecord] + awards: list[PotAward] + started_at: float = field(default_factory=time) + finished_at: float = field(default_factory=time) + + def to_dict(self) -> dict[str, object]: + return { + "game_id": self.game_id, + "hand_number": self.hand_number, + "button_seat": self.button_seat, + "board": [str(card) for card in self.board], + "actions": [record.to_dict() for record in self.actions], + "awards": [award.to_dict() for award in self.awards], + "started_at": self.started_at, + "finished_at": self.finished_at, + } diff --git a/texas_holdem/server.py b/texas_holdem/server.py new file mode 100644 index 0000000..03eca02 --- /dev/null +++ b/texas_holdem/server.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import argparse +import json +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any +from urllib.parse import urlparse + +from texas_holdem.engine import GameComplete +from texas_holdem.service import GameManager + +MANAGER = GameManager() + + +class PokerRequestHandler(BaseHTTPRequestHandler): + server_version = "TexasHoldemService/0.1" + + def do_GET(self) -> None: + path = self._path_parts() + try: + if path == ["health"]: + self._json({"ok": True}) + return + if path == ["games"]: + self._json({"games": MANAGER.list_games()}) + return + if len(path) == 2 and path[0] == "games": + self._json(MANAGER.get_game(path[1]).to_dict()) + return + self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) + except KeyError as exc: + self._json({"error": str(exc)}, HTTPStatus.NOT_FOUND) + + def do_POST(self) -> None: + path = self._path_parts() + try: + if path == ["games"]: + game = MANAGER.create_game(self._read_json()) + self._json(game.to_dict(), HTTPStatus.CREATED) + return + if len(path) == 3 and path[0] == "games" and path[2] == "hands": + body = self._read_json() + count = int(body.get("count", 1)) + until_one_left = bool(body.get("until_one_left", False)) + summaries = MANAGER.run_hands(path[1], count, until_one_left) + self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()}) + return + if len(path) == 4 and path[0] == "games" and path[2] == "hands" and path[3] == "run": + body = self._read_json() + count = int(body.get("count", 1)) + until_one_left = bool(body.get("until_one_left", False)) + summaries = MANAGER.run_hands(path[1], count, until_one_left) + self._json({"hands": summaries, "game": MANAGER.get_game(path[1]).to_dict()}) + return + self._json({"error": "not found"}, HTTPStatus.NOT_FOUND) + except KeyError as exc: + self._json({"error": str(exc)}, HTTPStatus.NOT_FOUND) + except (GameComplete, ValueError) as exc: + self._json({"error": str(exc)}, HTTPStatus.BAD_REQUEST) + + def log_message(self, format: str, *args: Any) -> None: + return + + def _path_parts(self) -> list[str]: + parsed = urlparse(self.path) + return [part for part in parsed.path.split("/") if part] + + def _read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length", "0")) + if length == 0: + return {} + try: + payload = json.loads(self.rfile.read(length).decode("utf-8")) + except json.JSONDecodeError as exc: + raise ValueError("request body must be valid JSON") from exc + if not isinstance(payload, dict): + raise ValueError("request body must be a JSON object") + return payload + + def _json(self, payload: dict[str, Any], status: HTTPStatus = HTTPStatus.OK) -> None: + body = json.dumps(payload, ensure_ascii=True).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +def create_server(host: str, port: int) -> ThreadingHTTPServer: + return ThreadingHTTPServer((host, port), PokerRequestHandler) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run the Texas Hold'em multi-agent service.") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", default=8000, type=int) + args = parser.parse_args() + + server = create_server(args.host, args.port) + print(f"Texas Hold'em service listening on http://{args.host}:{args.port}") + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/texas_holdem/service.py b/texas_holdem/service.py new file mode 100644 index 0000000..e0704ad --- /dev/null +++ b/texas_holdem/service.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from random import Random +from threading import RLock +from typing import Any +from uuid import uuid4 + +from texas_holdem.agents import build_agent +from texas_holdem.engine import TableGame + + +class GameManager: + def __init__(self) -> None: + self._games: dict[str, TableGame] = {} + self._lock = RLock() + + def create_game(self, payload: dict[str, Any]) -> TableGame: + players = payload.get("players") + if not isinstance(players, list): + raise ValueError("players must be a list") + if not 2 <= len(players) <= 12: + raise ValueError("players must contain 2-12 entries") + + seed = payload.get("seed") + rng = Random(seed) + game_id = str(payload.get("game_id") or uuid4()) + starting_stack = int(payload.get("starting_stack", 1000)) + small_blind = int(payload.get("small_blind", 5)) + big_blind = int(payload.get("big_blind", 10)) + + specs = [] + for seat, raw_spec in enumerate(players): + if not isinstance(raw_spec, dict): + raise ValueError("each player must be an object") + player_id = str(raw_spec.get("id") or raw_spec.get("player_id") or f"p{seat + 1}") + name = str(raw_spec.get("name") or player_id) + agent = build_agent(raw_spec.get("agent", raw_spec), rng) + specs.append((player_id, name, agent)) + + game = TableGame( + game_id=game_id, + player_specs=specs, + starting_stack=starting_stack, + small_blind=small_blind, + big_blind=big_blind, + rng=rng, + ) + with self._lock: + if game_id in self._games: + raise ValueError(f"game already exists: {game_id}") + self._games[game_id] = game + return game + + def get_game(self, game_id: str) -> TableGame: + with self._lock: + try: + return self._games[game_id] + except KeyError as exc: + raise KeyError(f"game not found: {game_id}") from exc + + def list_games(self) -> list[dict[str, object]]: + with self._lock: + return [game.to_dict() for game in self._games.values()] + + def run_hands(self, game_id: str, count: int = 1, until_one_left: bool = False) -> list[dict[str, object]]: + game = self.get_game(game_id) + with self._lock: + return [ + summary.to_dict() + for summary in game.run_hands(count, until_one_left=until_one_left) + ]