from __future__ import annotations from random import Random from time import time from texas_holdem.agents import PokerAgent from texas_holdem.cards import Deck from texas_holdem.evaluator import evaluate from texas_holdem.models import ( ActionRecord, HandSummary, Observation, PlayerAction, PlayerState, PotAward, ) STREETS = ("preflop", "flop", "turn", "river") class GameComplete(RuntimeError): pass class TableGame: def __init__( self, game_id: str, player_specs: list[tuple[str, str, PokerAgent]], starting_stack: int, small_blind: int, big_blind: int, rng: Random | None = None, ) -> None: if not 2 <= len(player_specs) <= 12: raise ValueError("a game requires 2-12 players") if starting_stack <= 0: raise ValueError("starting_stack must be positive") if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind: raise ValueError("blinds must satisfy 0 < small_blind <= big_blind") player_ids = [player_id for player_id, _, _ in player_specs] if len(set(player_ids)) != len(player_ids): raise ValueError("player ids must be unique") self.game_id = game_id self.players = [ PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat) for seat, (player_id, name, _) in enumerate(player_specs) ] self.agents = {player_id: agent for player_id, _, agent in player_specs} self.starting_stack = starting_stack self.small_blind = small_blind self.big_blind = big_blind self.rng = rng or Random() self.hand_number = 0 self.button_index: int | None = None self.board = [] self.action_history: list[ActionRecord] = [] self.hand_summaries: list[HandSummary] = [] @property def is_complete(self) -> bool: return len([player for player in self.players if player.stack > 0]) < 2 def run_hand(self) -> HandSummary: if self.is_complete: raise GameComplete("game is complete") self.hand_number += 1 started_at = time() self.board = [] self.action_history = [] deck = Deck(self.rng) for player in self.players: player.reset_for_hand() self._advance_button() assert self.button_index is not None self._deal_hole_cards(deck) small_blind_index, big_blind_index = self._blind_indexes() self._post_blind(small_blind_index, "small_blind", self.small_blind) self._post_blind(big_blind_index, "big_blind", self.big_blind) preflop_start = ( small_blind_index if self._active_player_count() == 2 else self._next_index(big_blind_index + 1, self._can_act) ) self._betting_round("preflop", preflop_start, self.big_blind) for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)): if self._contender_count() <= 1: break deck.burn() self.board.extend(deck.draw(card_count)) for player in self.players: player.reset_for_street() if self._betting_player_count() >= 2: start_index = self._next_index(self.button_index + 1, self._can_act) self._betting_round(street, start_index, self.big_blind) awards = self._award_pots() summary = HandSummary( game_id=self.game_id, hand_number=self.hand_number, button_seat=self.players[self.button_index].seat, board=list(self.board), actions=list(self.action_history), awards=awards, showdown_hands=self._collect_showdown_hands(), started_at=started_at, finished_at=time(), ) self.hand_summaries.append(summary) # Notify every agent so HTTP-backed clients can render the just # finished hand. Failures here must never abort the table. self._broadcast_game_update() return summary def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]: if max_hands <= 0: raise ValueError("max_hands must be positive") summaries = [] for _ in range(max_hands): if self.is_complete: break summaries.append(self.run_hand()) if until_one_left and self.is_complete: break return summaries def to_dict(self) -> dict[str, object]: return { "game_id": self.game_id, "status": "complete" if self.is_complete else "running", "hand_number": self.hand_number, "button_seat": None if self.button_index is None else self.players[self.button_index].seat, "small_blind": self.small_blind, "big_blind": self.big_blind, "starting_stack": self.starting_stack, "players": [player.public_dict() for player in self.players], # ``hands`` exposes every finished hand (each entry is the same # dict that was previously returned as ``last_hand``). Callers # that only want the most recent one can do ``hands[-1]``. "hands": [summary.to_dict() for summary in self.hand_summaries], } def _advance_button(self) -> None: if self.button_index is None: self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0) return self.button_index = self._next_index( self.button_index + 1, lambda index: self.players[index].stack > 0, ) def _blind_indexes(self) -> tuple[int, int]: assert self.button_index is not None if self._active_player_count() == 2: small_blind_index = self.button_index big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) else: small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand) assert small_blind_index is not None assert big_blind_index is not None return small_blind_index, big_blind_index def _deal_hole_cards(self, deck: Deck) -> None: assert self.button_index is not None deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand) for _ in range(2): for index in deal_order: self.players[index].hole_cards.extend(deck.draw()) def _post_blind(self, player_index: int, action: str, amount: int) -> None: player = self.players[player_index] committed = player.commit(amount) self._record_action(player, "preflop", action, committed) def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None: if start_index is None: return current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0) min_raise = opening_min_raise pending = {index for index in range(len(self.players)) if self._can_act(index)} call_only: set[int] = set() cursor = start_index while pending and self._contender_count() > 1: player_index = self._next_index(cursor, lambda index: index in pending) if player_index is None: break player = self.players[player_index] observation = self._observation( street, player_index, current_bet, min_raise, can_raise=player_index not in call_only, ) action = self._agent_action(player, observation) previous_bet = current_bet current_bet, min_raise, full_raise = self._apply_action( street, player, action, current_bet, min_raise, ) pending.discard(player_index) call_only.discard(player_index) opened_betting = previous_bet == 0 and current_bet > 0 if full_raise or opened_betting: pending = { index for index in range(len(self.players)) if index != player_index and self._can_act(index) } call_only.clear() elif current_bet > previous_bet: owing_players = { index for index in range(len(self.players)) if index != player_index and self._can_act(index) and self.players[index].street_bet < current_bet } call_only.update(owing_players - pending) pending.update(owing_players) pending = {index for index in pending if self._can_act(index)} call_only = {index for index in call_only if index in pending} cursor = player_index + 1 def _observation( self, street: str, player_index: int, current_bet: int, min_raise: int, can_raise: bool = True, ) -> Observation: player = self.players[player_index] legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise) min_raise_to = next( ( int(action["min_amount"]) for action in legal_actions if action["action"] in {"bet", "raise"} ), None, ) assert self.button_index is not None return Observation( game_id=self.game_id, hand_number=self.hand_number, street=street, player_id=player.player_id, seat=player.seat, button_seat=self.players[self.button_index].seat, small_blind=self.small_blind, big_blind=self.big_blind, board=list(self.board), hole_cards=list(player.hole_cards), players=[other.public_dict() for other in self.players], pot=sum(other.total_bet for other in self.players), to_call=max(0, current_bet - player.street_bet), min_raise_to=min_raise_to, legal_actions=legal_actions, action_history=list(self.action_history), ) def _legal_actions( self, player: PlayerState, current_bet: int, min_raise: int, can_raise: bool = True, ) -> list[dict[str, object]]: to_call = max(0, current_bet - player.street_bet) max_target = player.street_bet + player.stack actions: list[dict[str, object]] = [] if to_call > 0: actions.append({"action": "fold", "amount": 0}) actions.append({"action": "call", "amount": min(to_call, player.stack)}) if not can_raise: return actions min_raise_to = current_bet + min_raise if max_target >= min_raise_to: actions.append( { "action": "raise", "min_amount": min_raise_to, "max_amount": max_target, "amount_mode": "street_total", } ) elif max_target > current_bet: actions.append({"action": "all_in", "amount": max_target}) return actions actions.append({"action": "check", "amount": 0}) if player.stack <= 0: return actions if current_bet == 0: if max_target >= self.big_blind: actions.append( { "action": "bet", "min_amount": self.big_blind, "max_amount": max_target, "amount_mode": "street_total", } ) else: actions.append({"action": "all_in", "amount": max_target}) else: min_raise_to = current_bet + min_raise if max_target >= min_raise_to: actions.append( { "action": "raise", "min_amount": min_raise_to, "max_amount": max_target, "amount_mode": "street_total", } ) elif max_target > current_bet: actions.append({"action": "all_in", "amount": max_target}) return actions def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction: agent = self.agents[player.player_id] try: requested = agent.decide(observation) except Exception: requested = PlayerAction("fold") return self._coerce_action(requested, observation.legal_actions) def _coerce_action( self, requested: PlayerAction, legal_actions: list[dict[str, object]], ) -> PlayerAction: by_action = {str(action["action"]): action for action in legal_actions} requested_type = requested.action.lower() if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action: legal = by_action[requested_type] return PlayerAction(requested_type, int(legal.get("amount") or 0)) if requested_type in {"bet", "raise"} and requested_type in by_action: legal = by_action[requested_type] min_amount = int(legal["min_amount"]) max_amount = int(legal["max_amount"]) amount = min(max(requested.amount, min_amount), max_amount) return PlayerAction(requested_type, amount) for fallback in ("check", "call", "fold"): if fallback in by_action: legal = by_action[fallback] return PlayerAction(fallback, int(legal.get("amount") or 0)) legal = legal_actions[0] return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0)) def _apply_action( self, street: str, player: PlayerState, action: PlayerAction, current_bet: int, min_raise: int, ) -> tuple[int, int, bool]: previous_bet = current_bet committed = 0 full_raise = False if action.action == "fold": player.folded = True elif action.action == "check": pass elif action.action == "call": committed = player.commit(current_bet - player.street_bet) elif action.action in {"bet", "raise", "all_in"}: target = action.amount committed = player.commit(target - player.street_bet) current_bet = max(current_bet, player.street_bet) raise_size = current_bet - previous_bet if raise_size >= min_raise: full_raise = True min_raise = raise_size else: raise ValueError(f"unsupported action: {action.action}") self._record_action(player, street, action.action, committed) return current_bet, min_raise, full_raise def _award_pots(self) -> list[PotAward]: total_pot = sum(player.total_bet for player in self.players) live_players = [player for player in self.players if self._is_live(player)] if not live_players or total_pot <= 0: return [] if len(live_players) == 1: live_players[0].stack += total_pot return [PotAward(total_pot, [live_players[0].player_id], None)] levels = sorted({player.total_bet for player in self.players if player.total_bet > 0}) previous_level = 0 awards: list[PotAward] = [] for level in levels: contributors = [player for player in self.players if player.total_bet >= level] pot_amount = (level - previous_level) * len(contributors) previous_level = level contenders = [player for player in contributors if self._is_live(player)] if not contenders or pot_amount <= 0: continue values = { player.player_id: evaluate([*player.hole_cards, *self.board]) for player in contenders } best_value = max(values.values()) winners = [ player for player in contenders if values[player.player_id] == best_value ] ordered_winners = self._button_order(winners) share, remainder = divmod(pot_amount, len(ordered_winners)) for winner in ordered_winners: winner.stack += share for winner in ordered_winners[:remainder]: winner.stack += 1 awards.append( PotAward( amount=pot_amount, winners=[winner.player_id for winner in ordered_winners], hand_value=best_value, ) ) return awards def _collect_showdown_hands(self) -> dict[str, list]: """Snapshot hole cards of every player still eligible at showdown. We treat a hand as having reached showdown iff at least two players remain ``in_hand`` and unfolded after the river. Returning an empty dict for the one-player-left case keeps the wire format compact and avoids leaking hole cards when there was no real comparison. """ live_players = [player for player in self.players if self._is_live(player)] if len(live_players) < 2: return {} return { player.player_id: list(player.hole_cards) for player in live_players } def _broadcast_game_update(self) -> None: """Push the post-hand game snapshot to every agent's optional hook. Agents may opt into receiving game updates by overriding :meth:`PokerAgent.on_game_update`. The default implementation is a no-op, so this loop is essentially free for non-HTTP agents. We swallow individual exceptions so a flaky remote endpoint cannot break the table flow. """ snapshot = self.to_dict() for agent in self.agents.values(): try: agent.on_game_update(snapshot) except Exception: continue def _record_action( self, player: PlayerState, street: str, action: str, committed: int, ) -> None: self.action_history.append( ActionRecord( hand_number=self.hand_number, street=street, player_id=player.player_id, action=action, amount=committed, street_bet=player.street_bet, stack=player.stack, ) ) def _active_player_count(self) -> int: return len([player for player in self.players if player.stack > 0 or player.in_hand]) def _contender_count(self) -> int: return len([player for player in self.players if self._is_live(player)]) def _betting_player_count(self) -> int: return len([index for index in range(len(self.players)) if self._can_act(index)]) def _is_in_hand(self, index: int) -> bool: return self.players[index].in_hand def _is_live(self, player: PlayerState) -> bool: return player.in_hand and not player.folded def _can_act(self, index: int) -> bool: player = self.players[index] return self._is_live(player) and not player.all_in and player.stack > 0 def _next_index(self, start: int, predicate) -> int | None: player_count = len(self.players) for offset in range(player_count): index = (start + offset) % player_count if predicate(index): return index return None def _ordered_indexes(self, start: int, predicate) -> list[int]: player_count = len(self.players) indexes = [] for offset in range(player_count): index = (start + offset) % player_count if predicate(index): indexes.append(index) return indexes def _button_order(self, players: list[PlayerState]) -> list[PlayerState]: assert self.button_index is not None order = self._ordered_indexes(self.button_index + 1, lambda _: True) seat_rank = {index: rank for rank, index in enumerate(order)} return sorted(players, key=lambda player: seat_rank[player.seat])