Files
texas_hold_x/texas_holdem/engine.py
T
2026-05-11 21:09:55 +08:00

552 lines
21 KiB
Python

from __future__ import annotations
from random import Random
from time import time
from texas_holdem.agents import PokerAgent
from texas_holdem.cards import Deck
from texas_holdem.evaluator import evaluate
from texas_holdem.models import (
ActionRecord,
HandSummary,
Observation,
PlayerAction,
PlayerState,
PotAward,
)
STREETS = ("preflop", "flop", "turn", "river")
class GameComplete(RuntimeError):
pass
class TableGame:
def __init__(
self,
game_id: str,
player_specs: list[tuple[str, str, PokerAgent]],
starting_stack: int,
small_blind: int,
big_blind: int,
rng: Random | None = None,
) -> None:
if not 2 <= len(player_specs) <= 12:
raise ValueError("a game requires 2-12 players")
if starting_stack <= 0:
raise ValueError("starting_stack must be positive")
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
player_ids = [player_id for player_id, _, _ in player_specs]
if len(set(player_ids)) != len(player_ids):
raise ValueError("player ids must be unique")
self.game_id = game_id
self.players = [
PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat)
for seat, (player_id, name, _) in enumerate(player_specs)
]
self.agents = {player_id: agent for player_id, _, agent in player_specs}
self.starting_stack = starting_stack
self.small_blind = small_blind
self.big_blind = big_blind
self.rng = rng or Random()
self.hand_number = 0
self.button_index: int | None = None
self.board = []
self.action_history: list[ActionRecord] = []
self.hand_summaries: list[HandSummary] = []
@property
def is_complete(self) -> bool:
return len([player for player in self.players if player.stack > 0]) < 2
def run_hand(self) -> HandSummary:
if self.is_complete:
raise GameComplete("game is complete")
self.hand_number += 1
started_at = time()
self.board = []
self.action_history = []
deck = Deck(self.rng)
for player in self.players:
player.reset_for_hand()
self._advance_button()
assert self.button_index is not None
# Notify every agent that a new hand is starting. Pushing here (as
# opposed to after ``_award_pots``) lets HTTP agents seed a fresh
# session with the latest table state and per-hand history before
# any decision is asked of them.
self._broadcast_game_update()
self._deal_hole_cards(deck)
small_blind_index, big_blind_index = self._blind_indexes()
self._post_blind(small_blind_index, "small_blind", self.small_blind)
self._post_blind(big_blind_index, "big_blind", self.big_blind)
preflop_start = (
small_blind_index
if self._active_player_count() == 2
else self._next_index(big_blind_index + 1, self._can_act)
)
self._betting_round("preflop", preflop_start, self.big_blind)
for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)):
if self._contender_count() <= 1:
break
deck.burn()
self.board.extend(deck.draw(card_count))
for player in self.players:
player.reset_for_street()
if self._betting_player_count() >= 2:
start_index = self._next_index(self.button_index + 1, self._can_act)
self._betting_round(street, start_index, self.big_blind)
awards = self._award_pots()
summary = HandSummary(
game_id=self.game_id,
hand_number=self.hand_number,
button_seat=self.players[self.button_index].seat,
board=list(self.board),
actions=list(self.action_history),
awards=awards,
showdown_hands=self._collect_showdown_hands(),
started_at=started_at,
finished_at=time(),
)
self.hand_summaries.append(summary)
return summary
def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]:
if max_hands <= 0:
raise ValueError("max_hands must be positive")
summaries = []
for _ in range(max_hands):
if self.is_complete:
break
summaries.append(self.run_hand())
if until_one_left and self.is_complete:
break
return summaries
def to_dict(self) -> dict[str, object]:
return {
"game_id": self.game_id,
"status": "complete" if self.is_complete else "running",
"hand_number": self.hand_number,
"button_seat": None
if self.button_index is None
else self.players[self.button_index].seat,
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"starting_stack": self.starting_stack,
"players": [player.public_dict() for player in self.players],
# ``hands`` exposes every finished hand (each entry is the same
# dict that was previously returned as ``last_hand``). Callers
# that only want the most recent one can do ``hands[-1]``.
"hands": [summary.to_dict() for summary in self.hand_summaries],
}
def _advance_button(self) -> None:
if self.button_index is None:
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
return
self.button_index = self._next_index(
self.button_index + 1,
lambda index: self.players[index].stack > 0,
)
def _blind_indexes(self) -> tuple[int, int]:
assert self.button_index is not None
if self._active_player_count() == 2:
small_blind_index = self.button_index
big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
else:
small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand)
assert small_blind_index is not None
assert big_blind_index is not None
return small_blind_index, big_blind_index
def _deal_hole_cards(self, deck: Deck) -> None:
assert self.button_index is not None
deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand)
for _ in range(2):
for index in deal_order:
self.players[index].hole_cards.extend(deck.draw())
def _post_blind(self, player_index: int, action: str, amount: int) -> None:
player = self.players[player_index]
committed = player.commit(amount)
self._record_action(player, "preflop", action, committed)
def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None:
if start_index is None:
return
current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0)
min_raise = opening_min_raise
pending = {index for index in range(len(self.players)) if self._can_act(index)}
call_only: set[int] = set()
cursor = start_index
while pending and self._contender_count() > 1:
player_index = self._next_index(cursor, lambda index: index in pending)
if player_index is None:
break
player = self.players[player_index]
observation = self._observation(
street,
player_index,
current_bet,
min_raise,
can_raise=player_index not in call_only,
)
action = self._agent_action(player, observation)
previous_bet = current_bet
current_bet, min_raise, full_raise = self._apply_action(
street,
player,
action,
current_bet,
min_raise,
)
pending.discard(player_index)
call_only.discard(player_index)
opened_betting = previous_bet == 0 and current_bet > 0
if full_raise or opened_betting:
pending = {
index
for index in range(len(self.players))
if index != player_index and self._can_act(index)
}
call_only.clear()
elif current_bet > previous_bet:
owing_players = {
index
for index in range(len(self.players))
if index != player_index
and self._can_act(index)
and self.players[index].street_bet < current_bet
}
call_only.update(owing_players - pending)
pending.update(owing_players)
pending = {index for index in pending if self._can_act(index)}
call_only = {index for index in call_only if index in pending}
cursor = player_index + 1
def _observation(
self,
street: str,
player_index: int,
current_bet: int,
min_raise: int,
can_raise: bool = True,
) -> Observation:
player = self.players[player_index]
legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise)
min_raise_to = next(
(
int(action["min_amount"])
for action in legal_actions
if action["action"] in {"bet", "raise"}
),
None,
)
assert self.button_index is not None
return Observation(
game_id=self.game_id,
hand_number=self.hand_number,
street=street,
player_id=player.player_id,
seat=player.seat,
button_seat=self.players[self.button_index].seat,
small_blind=self.small_blind,
big_blind=self.big_blind,
board=list(self.board),
hole_cards=list(player.hole_cards),
players=[other.public_dict() for other in self.players],
pot=sum(other.total_bet for other in self.players),
to_call=max(0, current_bet - player.street_bet),
min_raise_to=min_raise_to,
legal_actions=legal_actions,
action_history=list(self.action_history),
)
def _legal_actions(
self,
player: PlayerState,
current_bet: int,
min_raise: int,
can_raise: bool = True,
) -> list[dict[str, object]]:
to_call = max(0, current_bet - player.street_bet)
max_target = player.street_bet + player.stack
actions: list[dict[str, object]] = []
if to_call > 0:
actions.append({"action": "fold", "amount": 0})
actions.append({"action": "call", "amount": min(to_call, player.stack)})
if not can_raise:
return actions
min_raise_to = current_bet + min_raise
if max_target >= min_raise_to:
actions.append(
{
"action": "raise",
"min_amount": min_raise_to,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
elif max_target > current_bet:
actions.append({"action": "all_in", "amount": max_target})
return actions
actions.append({"action": "check", "amount": 0})
if player.stack <= 0:
return actions
if current_bet == 0:
if max_target >= self.big_blind:
actions.append(
{
"action": "bet",
"min_amount": self.big_blind,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
else:
actions.append({"action": "all_in", "amount": max_target})
else:
min_raise_to = current_bet + min_raise
if max_target >= min_raise_to:
actions.append(
{
"action": "raise",
"min_amount": min_raise_to,
"max_amount": max_target,
"amount_mode": "street_total",
}
)
elif max_target > current_bet:
actions.append({"action": "all_in", "amount": max_target})
return actions
def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction:
agent = self.agents[player.player_id]
try:
requested = agent.decide(observation)
except Exception:
requested = PlayerAction("fold")
return self._coerce_action(requested, observation.legal_actions)
def _coerce_action(
self,
requested: PlayerAction,
legal_actions: list[dict[str, object]],
) -> PlayerAction:
by_action = {str(action["action"]): action for action in legal_actions}
requested_type = requested.action.lower()
if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action:
legal = by_action[requested_type]
return PlayerAction(requested_type, int(legal.get("amount") or 0))
if requested_type in {"bet", "raise"} and requested_type in by_action:
legal = by_action[requested_type]
min_amount = int(legal["min_amount"])
max_amount = int(legal["max_amount"])
amount = min(max(requested.amount, min_amount), max_amount)
return PlayerAction(requested_type, amount)
for fallback in ("check", "call", "fold"):
if fallback in by_action:
legal = by_action[fallback]
return PlayerAction(fallback, int(legal.get("amount") or 0))
legal = legal_actions[0]
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
def _apply_action(
self,
street: str,
player: PlayerState,
action: PlayerAction,
current_bet: int,
min_raise: int,
) -> tuple[int, int, bool]:
previous_bet = current_bet
committed = 0
full_raise = False
if action.action == "fold":
player.folded = True
elif action.action == "check":
pass
elif action.action == "call":
committed = player.commit(current_bet - player.street_bet)
elif action.action in {"bet", "raise", "all_in"}:
target = action.amount
committed = player.commit(target - player.street_bet)
current_bet = max(current_bet, player.street_bet)
raise_size = current_bet - previous_bet
if raise_size >= min_raise:
full_raise = True
min_raise = raise_size
else:
raise ValueError(f"unsupported action: {action.action}")
self._record_action(player, street, action.action, committed)
return current_bet, min_raise, full_raise
def _award_pots(self) -> list[PotAward]:
total_pot = sum(player.total_bet for player in self.players)
live_players = [player for player in self.players if self._is_live(player)]
if not live_players or total_pot <= 0:
return []
if len(live_players) == 1:
live_players[0].stack += total_pot
return [PotAward(total_pot, [live_players[0].player_id], None)]
levels = sorted({player.total_bet for player in self.players if player.total_bet > 0})
previous_level = 0
awards: list[PotAward] = []
for level in levels:
contributors = [player for player in self.players if player.total_bet >= level]
pot_amount = (level - previous_level) * len(contributors)
previous_level = level
contenders = [player for player in contributors if self._is_live(player)]
if not contenders or pot_amount <= 0:
continue
values = {
player.player_id: evaluate([*player.hole_cards, *self.board])
for player in contenders
}
best_value = max(values.values())
winners = [
player
for player in contenders
if values[player.player_id] == best_value
]
ordered_winners = self._button_order(winners)
share, remainder = divmod(pot_amount, len(ordered_winners))
for winner in ordered_winners:
winner.stack += share
for winner in ordered_winners[:remainder]:
winner.stack += 1
awards.append(
PotAward(
amount=pot_amount,
winners=[winner.player_id for winner in ordered_winners],
hand_value=best_value,
)
)
return awards
def _collect_showdown_hands(self) -> dict[str, list]:
"""Snapshot hole cards of every player still eligible at showdown.
We treat a hand as having reached showdown iff at least two players
remain ``in_hand`` and unfolded after the river. Returning an empty
dict for the one-player-left case keeps the wire format compact and
avoids leaking hole cards when there was no real comparison.
"""
live_players = [player for player in self.players if self._is_live(player)]
if len(live_players) < 2:
return {}
return {
player.player_id: list(player.hole_cards) for player in live_players
}
def _broadcast_game_update(self) -> None:
"""Push the post-hand game snapshot to every agent's optional hook.
Agents may opt into receiving game updates by overriding
:meth:`PokerAgent.on_game_update`. The default implementation is a
no-op, so this loop is essentially free for non-HTTP agents. We
swallow individual exceptions so a flaky remote endpoint cannot
break the table flow.
"""
snapshot = self.to_dict()
for agent in self.agents.values():
try:
agent.on_game_update(snapshot)
except Exception:
continue
def _record_action(
self,
player: PlayerState,
street: str,
action: str,
committed: int,
) -> None:
self.action_history.append(
ActionRecord(
hand_number=self.hand_number,
street=street,
player_id=player.player_id,
action=action,
amount=committed,
street_bet=player.street_bet,
stack=player.stack,
)
)
def _active_player_count(self) -> int:
return len([player for player in self.players if player.stack > 0 or player.in_hand])
def _contender_count(self) -> int:
return len([player for player in self.players if self._is_live(player)])
def _betting_player_count(self) -> int:
return len([index for index in range(len(self.players)) if self._can_act(index)])
def _is_in_hand(self, index: int) -> bool:
return self.players[index].in_hand
def _is_live(self, player: PlayerState) -> bool:
return player.in_hand and not player.folded
def _can_act(self, index: int) -> bool:
player = self.players[index]
return self._is_live(player) and not player.all_in and player.stack > 0
def _next_index(self, start: int, predicate) -> int | None:
player_count = len(self.players)
for offset in range(player_count):
index = (start + offset) % player_count
if predicate(index):
return index
return None
def _ordered_indexes(self, start: int, predicate) -> list[int]:
player_count = len(self.players)
indexes = []
for offset in range(player_count):
index = (start + offset) % player_count
if predicate(index):
indexes.append(index)
return indexes
def _button_order(self, players: list[PlayerState]) -> list[PlayerState]:
assert self.button_index is not None
order = self._ordered_indexes(self.button_index + 1, lambda _: True)
seat_rank = {index: rank for rank, index in enumerate(order)}
return sorted(players, key=lambda player: seat_rank[player.seat])