Files
texas_hold_x/texas_holdem/engine.py
mamamiyear c0bc5384f4 feat: add hand detail API and enrich hand summary fields
- HandSummary: add hole_cards, starting_stacks, ending_stacks, pot_contributions
- Engine: capture all players' hole cards (not just showdown), pre/post hand stacks, per-level pot contributions
- Server: new GET /game/<game_id>/hands/<hand_number> route
- Service: add get_hand_state() method
- Tests: add ServerTests for new endpoint, update existing tests
- Existing GET /game/<game_id> auto-inherits new fields via shared to_dict()
2026-05-23 22:11:45 +08:00

747 lines
29 KiB
Python

from __future__ import annotations
from copy import deepcopy
from random import Random
from threading import RLock
from time import time
from texas_holdem.agents import PokerAgent
from texas_holdem.cards import Deck
from texas_holdem.evaluator import evaluate
from texas_holdem.models import (
ActionRecord,
BlindLevel,
HandSummary,
Observation,
PlayerAction,
PlayerState,
PotAward,
)
STREETS = ("preflop", "flop", "turn", "river")
class GameComplete(RuntimeError):
pass
class TableGame:
def __init__(
self,
game_id: str,
player_specs: list[tuple[str, str, PokerAgent]],
starting_stack: int,
small_blind: int,
big_blind: int,
rng: Random | None = None,
) -> None:
if not 2 <= len(player_specs) <= 12:
raise ValueError("a game requires 2-12 players")
if starting_stack <= 0:
raise ValueError("starting_stack must be positive")
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
player_ids = [player_id for player_id, _, _ in player_specs]
if len(set(player_ids)) != len(player_ids):
raise ValueError("player ids must be unique")
self.game_id = game_id
self.players = [
PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat)
for seat, (player_id, name, _) in enumerate(player_specs)
]
self.agents = {player_id: agent for player_id, _, agent in player_specs}
self.starting_stack = starting_stack
self.small_blind = small_blind
self.big_blind = big_blind
self.rng = rng or Random()
self.lock = RLock()
self.hand_number = 0
self.button_index: int | None = None
self.board = []
self.action_history: list[ActionRecord] = []
self.hand_summaries: list[HandSummary] = []
self._last_pot_contributions: list[dict[str, object]] = []
# ``blind_history`` is an append-only log of every blind level change
# (including the initial one). Each entry's ``hand_number`` is the
# first hand that played under those stakes, which makes it trivial
# to reconstruct the schedule from the outside.
self.blind_history: list[BlindLevel] = []
self._completed_snapshot: dict[str, object] = self._to_dict_unlocked()
@property
def is_complete(self) -> bool:
return len([player for player in self.players if player.stack > 0]) < 2
def run_hand(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
"""Play a single hand.
``small_blind`` / ``big_blind`` allow callers to bump the stakes
between hands without rebuilding the table. Either both must be
provided or both omitted (in which case the previously configured
blinds carry over). The resolved blind level is appended to
:attr:`blind_history` whenever it changes (including the very first
hand) so external observers can replay the schedule.
"""
with self.lock:
return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind)
def _run_hand_locked(
self,
small_blind: int | None = None,
big_blind: int | None = None,
) -> HandSummary:
if self.is_complete:
raise GameComplete("game is complete")
self._apply_blinds_for_hand(small_blind, big_blind)
self.hand_number += 1
# Stamp the active blind level onto the upcoming summary so a hand
# remains self-describing even after the blinds change later on.
active_blinds = BlindLevel(
hand_number=self.hand_number,
small_blind=self.small_blind,
big_blind=self.big_blind,
)
self._record_blind_level_if_new(active_blinds)
started_at = time()
self.board = []
self.action_history = []
deck = Deck(self.rng)
for player in self.players:
player.reset_for_hand()
starting_stacks = {
player.player_id: player.stack
for player in self.players
if player.in_hand
}
self._advance_button()
assert self.button_index is not None
# Notify every agent that a new hand is starting. Pushing here (as
# opposed to after ``_award_pots``) lets HTTP agents seed a fresh
# session with the latest table state and per-hand history before
# any decision is asked of them.
self._broadcast_game_update()
self._deal_hole_cards(deck)
hole_cards = {
player.player_id: list(player.hole_cards)
for player in self.players
if player.in_hand
}
small_blind_index, big_blind_index = self._blind_indexes()
self._post_blind(small_blind_index, "small_blind", self.small_blind)
self._post_blind(big_blind_index, "big_blind", self.big_blind)
preflop_start = (
small_blind_index
if self._active_player_count() == 2
else self._next_index(big_blind_index + 1, self._can_act)
)
self._betting_round("preflop", preflop_start, self.big_blind)
for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)):
if self._contender_count() <= 1:
break
deck.burn()
self.board.extend(deck.draw(card_count))
for player in self.players:
player.reset_for_street()
if self._betting_player_count() >= 2:
start_index = self._next_index(self.button_index + 1, self._can_act)
self._betting_round(street, start_index, self.big_blind)
awards = self._award_pots()
ending_stacks = {
player.player_id: player.stack
for player in self.players
if player.player_id in starting_stacks
}
summary = HandSummary(
game_id=self.game_id,
hand_number=self.hand_number,
button_seat=self.players[self.button_index].seat,
board=list(self.board),
actions=list(self.action_history),
awards=awards,
blinds=active_blinds,
hole_cards=hole_cards,
starting_stacks=starting_stacks,
ending_stacks=ending_stacks,
pot_contributions=deepcopy(self._last_pot_contributions),
showdown_hands=self._collect_showdown_hands(),
started_at=started_at,
finished_at=time(),
)
self.hand_summaries.append(summary)
self._completed_snapshot = deepcopy(self._to_dict_unlocked())
return summary
def run_hands(
self,
max_hands: int,
until_one_left: bool = False,
small_blind: int | None = None,
big_blind: int | None = None,
) -> list[HandSummary]:
"""Play up to ``max_hands`` hands using a single blind configuration.
Passing ``small_blind`` / ``big_blind`` bumps the stakes starting
with the first hand of this call; subsequent calls can raise them
again. Leaving them ``None`` keeps the current level unchanged.
"""
with self.lock:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
# Only the first hand of the batch needs to apply the blind
# override; after that the engine reuses the stored values.
summaries.append(
self._run_hand_locked(
small_blind=small_blind,
big_blind=big_blind,
)
)
small_blind = None
big_blind = None
if until_one_left and self.is_complete:
break
return summaries
def to_dict(self) -> dict[str, object]:
with self.lock:
return self._to_dict_unlocked()
def snapshot_completed(self) -> dict[str, object]:
"""Return a stable snapshot from the latest completed hand boundary.
If a hand is currently running under ``self.lock``, this method does
not block. It returns the most recent completed hand summary and
stacks captured in memory, which is exactly what status endpoints
need while a long-running HTTP-agent decision is in progress.
"""
if self.lock.acquire(blocking=False):
try:
return deepcopy(self._to_dict_unlocked())
finally:
self.lock.release()
return deepcopy(self._completed_snapshot)
def _to_dict_unlocked(self) -> dict[str, object]:
return {
"game_id": self.game_id,
"status": "complete" if self.is_complete else "running",
"hand_number": self.hand_number,
"button_seat": None
if self.button_index is None
else self.players[self.button_index].seat,
# ``small_blind`` / ``big_blind`` mirror the *current* level so
# legacy callers keep working. New consumers should prefer the
# structured ``blinds`` block which carries the full schedule.
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"blinds": {
"current": {
"small_blind": self.small_blind,
"big_blind": self.big_blind,
},
"history": [level.to_dict() for level in self.blind_history],
},
"starting_stack": self.starting_stack,
"players": [player.public_dict() for player in self.players],
# ``hands`` exposes every finished hand (each entry is the same
# dict that was previously returned as ``last_hand``). Callers
# that only want the most recent one can do ``hands[-1]``.
"hands": [summary.to_dict() for summary in self.hand_summaries],
}
def _apply_blinds_for_hand(
self,
small_blind: int | None,
big_blind: int | None,
) -> None:
"""Validate and apply optional per-hand blind overrides.
Splitting this out keeps :meth:`run_hand` focused on the table flow
while letting us reuse the validation rules originally enforced by
``__init__``. We require both values to be supplied together so the
configuration cannot drift into an inconsistent half-update.
"""
if small_blind is None and big_blind is None:
return
if small_blind is None or big_blind is None:
raise ValueError(
"small_blind and big_blind must be provided together"
)
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
self.small_blind = int(small_blind)
self.big_blind = int(big_blind)
def _record_blind_level_if_new(self, level: BlindLevel) -> None:
"""Append ``level`` to :attr:`blind_history` when it differs.
Comparing against the latest entry (rather than blindly appending)
keeps the log compact: stretches of unchanged stakes only contribute
a single record. The very first hand always seeds an entry because
the history starts empty.
"""
if not self.blind_history:
self.blind_history.append(level)
return
latest = self.blind_history[-1]
if (
latest.small_blind != level.small_blind
or latest.big_blind != level.big_blind
):
self.blind_history.append(level)
def _advance_button(self) -> None:
if self.button_index is None:
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
return
self.button_index = self._next_index(
self.button_index + 1,
lambda index: self.players[index].stack > 0,
)
def _blind_indexes(self) -> tuple[int, int]:
assert self.button_index is not None
if self._active_player_count() == 2:
small_blind_index = self.button_index
big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
else:
small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand)
assert small_blind_index is not None
assert big_blind_index is not None
return small_blind_index, big_blind_index
def _deal_hole_cards(self, deck: Deck) -> None:
assert self.button_index is not None
deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand)
for _ in range(2):
for index in deal_order:
self.players[index].hole_cards.extend(deck.draw())
def _post_blind(self, player_index: int, action: str, amount: int) -> None:
player = self.players[player_index]
committed = player.commit(amount)
self._record_action(player, "preflop", action, committed)
def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None:
if start_index is None:
return
current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0)
min_raise = opening_min_raise
pending = {index for index in range(len(self.players)) if self._can_act(index)}
call_only: set[int] = set()
cursor = start_index
while pending and self._contender_count() > 1:
player_index = self._next_index(cursor, lambda index: index in pending)
if player_index is None:
break
player = self.players[player_index]
observation = self._observation(
street,
player_index,
current_bet,
min_raise,
can_raise=player_index not in call_only,
)
action = self._agent_action(player, observation)
previous_bet = current_bet
current_bet, min_raise, full_raise = self._apply_action(
street,
player,
action,
current_bet,
min_raise,
)
pending.discard(player_index)
call_only.discard(player_index)
opened_betting = previous_bet == 0 and current_bet > 0
if full_raise or opened_betting:
pending = {
index
for index in range(len(self.players))
if index != player_index and self._can_act(index)
}
call_only.clear()
elif current_bet > previous_bet:
owing_players = {
index
for index in range(len(self.players))
if index != player_index
and self._can_act(index)
and self.players[index].street_bet < current_bet
}
call_only.update(owing_players - pending)
pending.update(owing_players)
pending = {index for index in pending if self._can_act(index)}
call_only = {index for index in call_only if index in pending}
cursor = player_index + 1
def _observation(
self,
street: str,
player_index: int,
current_bet: int,
min_raise: int,
can_raise: bool = True,
) -> Observation:
player = self.players[player_index]
legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise)
min_raise_to = next(
(
int(action["min_amount"])
for action in legal_actions
if action["action"] in {"bet", "raise"}
),
None,
)
assert self.button_index is not None
return Observation(
game_id=self.game_id,
hand_number=self.hand_number,
street=street,
player_id=player.player_id,
seat=player.seat,
button_seat=self.players[self.button_index].seat,
small_blind=self.small_blind,
big_blind=self.big_blind,
board=list(self.board),
hole_cards=list(player.hole_cards),
players=[other.public_dict() for other in self.players],
pot=sum(other.total_bet for other in self.players),
to_call=max(0, current_bet - player.street_bet),
min_raise_to=min_raise_to,
legal_actions=legal_actions,
action_history=list(self.action_history),
)
def _legal_actions(
self,
player: PlayerState,
current_bet: int,
min_raise: int,
can_raise: bool = True,
) -> list[dict[str, object]]:
to_call = max(0, current_bet - player.street_bet)
max_target = player.street_bet + player.stack
actions: list[dict[str, object]] = []
if to_call > 0:
actions.append({"action": "fold", "amount": 0})
actions.append({"action": "call", "amount": min(to_call, player.stack)})
if not can_raise:
return actions
min_raise_to = current_bet + min_raise
if max_target >= min_raise_to:
actions.append(
{
"action": "raise",
"min_amount": min_raise_to,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
elif max_target > current_bet:
actions.append({"action": "all_in", "amount": max_target})
return actions
actions.append({"action": "check", "amount": 0})
if player.stack <= 0:
return actions
if current_bet == 0:
if max_target >= self.big_blind:
actions.append(
{
"action": "bet",
"min_amount": self.big_blind,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
else:
actions.append({"action": "all_in", "amount": max_target})
else:
min_raise_to = current_bet + min_raise
if max_target >= min_raise_to:
actions.append(
{
"action": "raise",
"min_amount": min_raise_to,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
elif max_target > current_bet:
actions.append({"action": "all_in", "amount": max_target})
return actions
def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction:
agent = self.agents[player.player_id]
try:
requested = agent.decide(observation)
except Exception:
requested = self._default_action(observation.legal_actions)
return self._coerce_action(requested, observation.legal_actions)
def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
for action_type in ("check", "call", "fold"):
if action_type in by_action:
legal = by_action[action_type]
return PlayerAction(action_type, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _coerce_action(
self,
requested: PlayerAction,
legal_actions: list[dict[str, object]],
) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
requested_type = requested.action.lower()
if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action:
legal = by_action[requested_type]
return PlayerAction(requested_type, int(legal.get("amount") or 0))
if requested_type in {"bet", "raise"} and requested_type in by_action:
legal = by_action[requested_type]
min_amount = int(legal["min_amount"])
max_amount = int(legal["max_amount"])
amount = min(max(requested.amount, min_amount), max_amount)
return PlayerAction(requested_type, amount)
for fallback in ("check", "call", "fold"):
if fallback in by_action:
legal = by_action[fallback]
return PlayerAction(fallback, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _apply_action(
self,
street: str,
player: PlayerState,
action: PlayerAction,
current_bet: int,
min_raise: int,
) -> tuple[int, int, bool]:
previous_bet = current_bet
committed = 0
full_raise = False
if action.action == "fold":
player.folded = True
elif action.action == "check":
pass
elif action.action == "call":
committed = player.commit(current_bet - player.street_bet)
elif action.action in {"bet", "raise", "all_in"}:
target = action.amount
committed = player.commit(target - player.street_bet)
current_bet = max(current_bet, player.street_bet)
raise_size = current_bet - previous_bet
if raise_size >= min_raise:
full_raise = True
min_raise = raise_size
else:
raise ValueError(f"unsupported action: {action.action}")
self._record_action(player, street, action.action, committed)
return current_bet, min_raise, full_raise
def _award_pots(self) -> list[PotAward]:
self._last_pot_contributions = []
total_pot = sum(player.total_bet for player in self.players)
live_players = [player for player in self.players if self._is_live(player)]
if not live_players or total_pot <= 0:
return []
levels = sorted({player.total_bet for player in self.players if player.total_bet > 0})
if len(live_players) == 1:
winner = live_players[0]
winner.stack += total_pot
previous_level = 0
for level in levels:
contributors = [player for player in self.players if player.total_bet >= level]
pot_amount = (level - previous_level) * len(contributors)
self._last_pot_contributions.append(
{
"amount": pot_amount,
"contributors": {
player.player_id: level - previous_level
for player in contributors
},
"winners": [winner.player_id],
"hand_value": None,
}
)
previous_level = level
return [PotAward(total_pot, [winner.player_id], None)]
previous_level = 0
awards: list[PotAward] = []
for level in levels:
contributors = [player for player in self.players if player.total_bet >= level]
pot_amount = (level - previous_level) * len(contributors)
level_contributions = {
player.player_id: level - previous_level
for player in contributors
}
previous_level = level
contenders = [player for player in contributors if self._is_live(player)]
if not contenders or pot_amount <= 0:
continue
values = {
player.player_id: evaluate([*player.hole_cards, *self.board])
for player in contenders
}
best_value = max(values.values())
winners = [
player
for player in contenders
if values[player.player_id] == best_value
]
ordered_winners = self._button_order(winners)
share, remainder = divmod(pot_amount, len(ordered_winners))
for winner in ordered_winners:
winner.stack += share
for winner in ordered_winners[:remainder]:
winner.stack += 1
winner_ids = [winner.player_id for winner in ordered_winners]
awards.append(
PotAward(
amount=pot_amount,
winners=winner_ids,
hand_value=best_value,
)
)
self._last_pot_contributions.append(
{
"amount": pot_amount,
"contributors": level_contributions,
"winners": winner_ids,
"hand_value": best_value,
}
)
return awards
def _collect_showdown_hands(self) -> dict[str, list]:
"""Snapshot hole cards of every player still eligible at showdown.
We treat a hand as having reached showdown iff at least two players
remain ``in_hand`` and unfolded after the river. Returning an empty
dict for the one-player-left case keeps the wire format compact and
avoids leaking hole cards when there was no real comparison.
"""
live_players = [player for player in self.players if self._is_live(player)]
if len(live_players) < 2:
return {}
return {
player.player_id: list(player.hole_cards) for player in live_players
}
def _broadcast_game_update(self) -> None:
"""Push the post-hand game snapshot to every agent's optional hook.
Agents may opt into receiving game updates by overriding
:meth:`PokerAgent.on_game_update`. The default implementation is a
no-op, so this loop is essentially free for non-HTTP agents. We
swallow individual exceptions so a flaky remote endpoint cannot
break the table flow.
"""
snapshot = self._to_dict_unlocked()
for agent in self.agents.values():
try:
agent.on_game_update(snapshot)
except Exception:
continue
def _record_action(
self,
player: PlayerState,
street: str,
action: str,
committed: int,
) -> None:
self.action_history.append(
ActionRecord(
hand_number=self.hand_number,
street=street,
player_id=player.player_id,
action=action,
amount=committed,
street_bet=player.street_bet,
stack=player.stack,
)
)
def _active_player_count(self) -> int:
return len([player for player in self.players if player.stack > 0 or player.in_hand])
def _contender_count(self) -> int:
return len([player for player in self.players if self._is_live(player)])
def _betting_player_count(self) -> int:
return len([index for index in range(len(self.players)) if self._can_act(index)])
def _is_in_hand(self, index: int) -> bool:
return self.players[index].in_hand
def _is_live(self, player: PlayerState) -> bool:
return player.in_hand and not player.folded
def _can_act(self, index: int) -> bool:
player = self.players[index]
return self._is_live(player) and not player.all_in and player.stack > 0
def _next_index(self, start: int, predicate) -> int | None:
player_count = len(self.players)
for offset in range(player_count):
index = (start + offset) % player_count
if predicate(index):
return index
return None
def _ordered_indexes(self, start: int, predicate) -> list[int]:
player_count = len(self.players)
indexes = []
for offset in range(player_count):
index = (start + offset) % player_count
if predicate(index):
indexes.append(index)
return indexes
def _button_order(self, players: list[PlayerState]) -> list[PlayerState]:
assert self.button_index is not None
order = self._ordered_indexes(self.button_index + 1, lambda _: True)
seat_rank = {index: rank for rank, index in enumerate(order)}
return sorted(players, key=lambda player: seat_rank[player.seat])