from __future__ import annotations from copy import deepcopy from random import Random from threading import RLock from time import time from texas_holdem.agents import PokerAgent from texas_holdem.cards import Deck from texas_holdem.evaluator import evaluate from texas_holdem.models import ( ActionRecord, BlindLevel, HandSummary, Observation, PlayerAction, PlayerState, PotAward, ) STREETS = ("preflop", "flop", "turn", "river") class GameComplete(RuntimeError): pass class TableGame: def __init__( self, game_id: str, player_specs: list[tuple[str, str, PokerAgent]], starting_stack: int, small_blind: int, big_blind: int, rng: Random | None = None, ) -> None: if not 2 <= len(player_specs) <= 12: raise ValueError("a game requires 2-12 players") if starting_stack <= 0: raise ValueError("starting_stack must be positive") if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind: raise ValueError("blinds must satisfy 0 < small_blind <= big_blind") player_ids = [player_id for player_id, _, _ in player_specs] if len(set(player_ids)) != len(player_ids): raise ValueError("player ids must be unique") self.game_id = game_id self.players = [ PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat) for seat, (player_id, name, _) in enumerate(player_specs) ] self.agents = {player_id: agent for player_id, _, agent in player_specs} self.starting_stack = starting_stack self.small_blind = small_blind self.big_blind = big_blind self.rng = rng or Random() self.lock = RLock() self.hand_number = 0 self.button_index: int | None = None self.board = [] self.action_history: list[ActionRecord] = [] self.hand_summaries: list[HandSummary] = [] # ``blind_history`` is an append-only log of every blind level change # (including the initial one). Each entry's ``hand_number`` is the # first hand that played under those stakes, which makes it trivial # to reconstruct the schedule from the outside. self.blind_history: list[BlindLevel] = [] self._completed_snapshot: dict[str, object] = self._to_dict_unlocked() @property def is_complete(self) -> bool: return len([player for player in self.players if player.stack > 0]) < 2 def run_hand( self, small_blind: int | None = None, big_blind: int | None = None, ) -> HandSummary: """Play a single hand. ``small_blind`` / ``big_blind`` allow callers to bump the stakes between hands without rebuilding the table. Either both must be provided or both omitted (in which case the previously configured blinds carry over). The resolved blind level is appended to :attr:`blind_history` whenever it changes (including the very first hand) so external observers can replay the schedule. """ with self.lock: return self._run_hand_locked(small_blind=small_blind, big_blind=big_blind) def _run_hand_locked( self, small_blind: int | None = None, big_blind: int | None = None, ) -> HandSummary: if self.is_complete: raise GameComplete("game is complete") self._apply_blinds_for_hand(small_blind, big_blind) self.hand_number += 1 # Stamp the active blind level onto the upcoming summary so a hand # remains self-describing even after the blinds change later on. active_blinds = BlindLevel( hand_number=self.hand_number, small_blind=self.small_blind, big_blind=self.big_blind, ) self._record_blind_level_if_new(active_blinds) started_at = time() self.board = [] self.action_history = [] deck = Deck(self.rng) for player in self.players: player.reset_for_hand() self._advance_button() assert self.button_index is not None # Notify every agent that a new hand is starting. Pushing here (as # opposed to after ``_award_pots``) lets HTTP agents seed a fresh # session with the latest table state and per-hand history before # any decision is asked of them. self._broadcast_game_update() self._deal_hole_cards(deck) small_blind_index, big_blind_index = self._blind_indexes() self._post_blind(small_blind_index, "small_blind", self.small_blind) self._post_blind(big_blind_index, "big_blind", self.big_blind) preflop_start = ( small_blind_index if self._active_player_count() == 2 else self._next_index(big_blind_index + 1, self._can_act) ) self._betting_round("preflop", preflop_start, self.big_blind) for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)): if self._contender_count() <= 1: break deck.burn() self.board.extend(deck.draw(card_count)) for player in self.players: player.reset_for_street() if self._betting_player_count() >= 2: start_index = self._next_index(self.button_index + 1, self._can_act) self._betting_round(street, start_index, self.big_blind) awards = self._award_pots() summary = HandSummary( game_id=self.game_id, hand_number=self.hand_number, button_seat=self.players[self.button_index].seat, board=list(self.board), actions=list(self.action_history), awards=awards, blinds=active_blinds, showdown_hands=self._collect_showdown_hands(), started_at=started_at, finished_at=time(), ) self.hand_summaries.append(summary) self._completed_snapshot = deepcopy(self._to_dict_unlocked()) return summary def run_hands( self, max_hands: int, until_one_left: bool = False, small_blind: int | None = None, big_blind: int | None = None, ) -> list[HandSummary]: """Play up to ``max_hands`` hands using a single blind configuration. Passing ``small_blind`` / ``big_blind`` bumps the stakes starting with the first hand of this call; subsequent calls can raise them again. Leaving them ``None`` keeps the current level unchanged. """ with self.lock: if max_hands <= 0: raise ValueError("max_hands must be positive") summaries = [] for _ in range(max_hands): if self.is_complete: break # Only the first hand of the batch needs to apply the blind # override; after that the engine reuses the stored values. summaries.append( self._run_hand_locked( small_blind=small_blind, big_blind=big_blind, ) ) small_blind = None big_blind = None if until_one_left and self.is_complete: break return summaries def to_dict(self) -> dict[str, object]: with self.lock: return self._to_dict_unlocked() def snapshot_completed(self) -> dict[str, object]: """Return a stable snapshot from the latest completed hand boundary. If a hand is currently running under ``self.lock``, this method does not block. It returns the most recent completed hand summary and stacks captured in memory, which is exactly what status endpoints need while a long-running HTTP-agent decision is in progress. """ if self.lock.acquire(blocking=False): try: return deepcopy(self._to_dict_unlocked()) finally: self.lock.release() return deepcopy(self._completed_snapshot) def _to_dict_unlocked(self) -> dict[str, object]: return { "game_id": self.game_id, "status": "complete" if self.is_complete else "running", "hand_number": self.hand_number, "button_seat": None if self.button_index is None else self.players[self.button_index].seat, # ``small_blind`` / ``big_blind`` mirror the *current* level so # legacy callers keep working. New consumers should prefer the # structured ``blinds`` block which carries the full schedule. "small_blind": self.small_blind, "big_blind": self.big_blind, "blinds": { "current": { "small_blind": self.small_blind, "big_blind": self.big_blind, }, "history": [level.to_dict() for level in self.blind_history], }, "starting_stack": self.starting_stack, "players": [player.public_dict() for player in self.players], # ``hands`` exposes every finished hand (each entry is the same # dict that was previously returned as ``last_hand``). Callers # that only want the most recent one can do ``hands[-1]``. "hands": [summary.to_dict() for summary in self.hand_summaries], } def _apply_blinds_for_hand( self, small_blind: int | None, big_blind: int | None, ) -> None: """Validate and apply optional per-hand blind overrides. Splitting this out keeps :meth:`run_hand` focused on the table flow while letting us reuse the validation rules originally enforced by ``__init__``. We require both values to be supplied together so the configuration cannot drift into an inconsistent half-update. """ if small_blind is None and big_blind is None: return if small_blind is None or big_blind is None: raise ValueError( "small_blind and big_blind must be provided together" ) if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind: raise ValueError("blinds must satisfy 0 < small_blind <= big_blind") self.small_blind = int(small_blind) self.big_blind = int(big_blind) def _record_blind_level_if_new(self, level: BlindLevel) -> None: """Append ``level`` to :attr:`blind_history` when it differs. Comparing against the latest entry (rather than blindly appending) keeps the log compact: stretches of unchanged stakes only contribute a single record. The very first hand always seeds an entry because the history starts empty. """ if not self.blind_history: self.blind_history.append(level) return latest = self.blind_history[-1] if ( latest.small_blind != level.small_blind or latest.big_blind != level.big_blind ): self.blind_history.append(level) def _advance_button(self) -> None: if self.button_index is None: self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0) return self.button_index = self._next_index( self.button_index + 1, lambda index: self.players[index].stack > 0, ) def _blind_indexes(self) -> tuple[int, int]: assert self.button_index is not None if self._active_player_count() == 2: small_blind_index = self.button_index big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) else: small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand) big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand) assert small_blind_index is not None assert big_blind_index is not None return small_blind_index, big_blind_index def _deal_hole_cards(self, deck: Deck) -> None: assert self.button_index is not None deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand) for _ in range(2): for index in deal_order: self.players[index].hole_cards.extend(deck.draw()) def _post_blind(self, player_index: int, action: str, amount: int) -> None: player = self.players[player_index] committed = player.commit(amount) self._record_action(player, "preflop", action, committed) def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None: if start_index is None: return current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0) min_raise = opening_min_raise pending = {index for index in range(len(self.players)) if self._can_act(index)} call_only: set[int] = set() cursor = start_index while pending and self._contender_count() > 1: player_index = self._next_index(cursor, lambda index: index in pending) if player_index is None: break player = self.players[player_index] observation = self._observation( street, player_index, current_bet, min_raise, can_raise=player_index not in call_only, ) action = self._agent_action(player, observation) previous_bet = current_bet current_bet, min_raise, full_raise = self._apply_action( street, player, action, current_bet, min_raise, ) pending.discard(player_index) call_only.discard(player_index) opened_betting = previous_bet == 0 and current_bet > 0 if full_raise or opened_betting: pending = { index for index in range(len(self.players)) if index != player_index and self._can_act(index) } call_only.clear() elif current_bet > previous_bet: owing_players = { index for index in range(len(self.players)) if index != player_index and self._can_act(index) and self.players[index].street_bet < current_bet } call_only.update(owing_players - pending) pending.update(owing_players) pending = {index for index in pending if self._can_act(index)} call_only = {index for index in call_only if index in pending} cursor = player_index + 1 def _observation( self, street: str, player_index: int, current_bet: int, min_raise: int, can_raise: bool = True, ) -> Observation: player = self.players[player_index] legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise) min_raise_to = next( ( int(action["min_amount"]) for action in legal_actions if action["action"] in {"bet", "raise"} ), None, ) assert self.button_index is not None return Observation( game_id=self.game_id, hand_number=self.hand_number, street=street, player_id=player.player_id, seat=player.seat, button_seat=self.players[self.button_index].seat, small_blind=self.small_blind, big_blind=self.big_blind, board=list(self.board), hole_cards=list(player.hole_cards), players=[other.public_dict() for other in self.players], pot=sum(other.total_bet for other in self.players), to_call=max(0, current_bet - player.street_bet), min_raise_to=min_raise_to, legal_actions=legal_actions, action_history=list(self.action_history), ) def _legal_actions( self, player: PlayerState, current_bet: int, min_raise: int, can_raise: bool = True, ) -> list[dict[str, object]]: to_call = max(0, current_bet - player.street_bet) max_target = player.street_bet + player.stack actions: list[dict[str, object]] = [] if to_call > 0: actions.append({"action": "fold", "amount": 0}) actions.append({"action": "call", "amount": min(to_call, player.stack)}) if not can_raise: return actions min_raise_to = current_bet + min_raise if max_target >= min_raise_to: actions.append( { "action": "raise", "min_amount": min_raise_to, "max_amount": max_target, "amount_mode": "street_total", } ) elif max_target > current_bet: actions.append({"action": "all_in", "amount": max_target}) return actions actions.append({"action": "check", "amount": 0}) if player.stack <= 0: return actions if current_bet == 0: if max_target >= self.big_blind: actions.append( { "action": "bet", "min_amount": self.big_blind, "max_amount": max_target, "amount_mode": "street_total", } ) else: actions.append({"action": "all_in", "amount": max_target}) else: min_raise_to = current_bet + min_raise if max_target >= min_raise_to: actions.append( { "action": "raise", "min_amount": min_raise_to, "max_amount": max_target, "amount_mode": "street_total", } ) elif max_target > current_bet: actions.append({"action": "all_in", "amount": max_target}) return actions def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction: agent = self.agents[player.player_id] try: requested = agent.decide(observation) except Exception: requested = self._default_action(observation.legal_actions) return self._coerce_action(requested, observation.legal_actions) def _default_action(self, legal_actions: list[dict[str, object]]) -> PlayerAction: by_action = {str(action["action"]): action for action in legal_actions} for action_type in ("check", "call", "fold"): if action_type in by_action: legal = by_action[action_type] return PlayerAction(action_type, int(legal.get("amount") or 0)) legal = legal_actions[0] return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0)) def _coerce_action( self, requested: PlayerAction, legal_actions: list[dict[str, object]], ) -> PlayerAction: by_action = {str(action["action"]): action for action in legal_actions} requested_type = requested.action.lower() if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action: legal = by_action[requested_type] return PlayerAction(requested_type, int(legal.get("amount") or 0)) if requested_type in {"bet", "raise"} and requested_type in by_action: legal = by_action[requested_type] min_amount = int(legal["min_amount"]) max_amount = int(legal["max_amount"]) amount = min(max(requested.amount, min_amount), max_amount) return PlayerAction(requested_type, amount) for fallback in ("check", "call", "fold"): if fallback in by_action: legal = by_action[fallback] return PlayerAction(fallback, int(legal.get("amount") or 0)) legal = legal_actions[0] return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0)) def _apply_action( self, street: str, player: PlayerState, action: PlayerAction, current_bet: int, min_raise: int, ) -> tuple[int, int, bool]: previous_bet = current_bet committed = 0 full_raise = False if action.action == "fold": player.folded = True elif action.action == "check": pass elif action.action == "call": committed = player.commit(current_bet - player.street_bet) elif action.action in {"bet", "raise", "all_in"}: target = action.amount committed = player.commit(target - player.street_bet) current_bet = max(current_bet, player.street_bet) raise_size = current_bet - previous_bet if raise_size >= min_raise: full_raise = True min_raise = raise_size else: raise ValueError(f"unsupported action: {action.action}") self._record_action(player, street, action.action, committed) return current_bet, min_raise, full_raise def _award_pots(self) -> list[PotAward]: total_pot = sum(player.total_bet for player in self.players) live_players = [player for player in self.players if self._is_live(player)] if not live_players or total_pot <= 0: return [] if len(live_players) == 1: live_players[0].stack += total_pot return [PotAward(total_pot, [live_players[0].player_id], None)] levels = sorted({player.total_bet for player in self.players if player.total_bet > 0}) previous_level = 0 awards: list[PotAward] = [] for level in levels: contributors = [player for player in self.players if player.total_bet >= level] pot_amount = (level - previous_level) * len(contributors) previous_level = level contenders = [player for player in contributors if self._is_live(player)] if not contenders or pot_amount <= 0: continue values = { player.player_id: evaluate([*player.hole_cards, *self.board]) for player in contenders } best_value = max(values.values()) winners = [ player for player in contenders if values[player.player_id] == best_value ] ordered_winners = self._button_order(winners) share, remainder = divmod(pot_amount, len(ordered_winners)) for winner in ordered_winners: winner.stack += share for winner in ordered_winners[:remainder]: winner.stack += 1 awards.append( PotAward( amount=pot_amount, winners=[winner.player_id for winner in ordered_winners], hand_value=best_value, ) ) return awards def _collect_showdown_hands(self) -> dict[str, list]: """Snapshot hole cards of every player still eligible at showdown. We treat a hand as having reached showdown iff at least two players remain ``in_hand`` and unfolded after the river. Returning an empty dict for the one-player-left case keeps the wire format compact and avoids leaking hole cards when there was no real comparison. """ live_players = [player for player in self.players if self._is_live(player)] if len(live_players) < 2: return {} return { player.player_id: list(player.hole_cards) for player in live_players } def _broadcast_game_update(self) -> None: """Push the post-hand game snapshot to every agent's optional hook. Agents may opt into receiving game updates by overriding :meth:`PokerAgent.on_game_update`. The default implementation is a no-op, so this loop is essentially free for non-HTTP agents. We swallow individual exceptions so a flaky remote endpoint cannot break the table flow. """ snapshot = self._to_dict_unlocked() for agent in self.agents.values(): try: agent.on_game_update(snapshot) except Exception: continue def _record_action( self, player: PlayerState, street: str, action: str, committed: int, ) -> None: self.action_history.append( ActionRecord( hand_number=self.hand_number, street=street, player_id=player.player_id, action=action, amount=committed, street_bet=player.street_bet, stack=player.stack, ) ) def _active_player_count(self) -> int: return len([player for player in self.players if player.stack > 0 or player.in_hand]) def _contender_count(self) -> int: return len([player for player in self.players if self._is_live(player)]) def _betting_player_count(self) -> int: return len([index for index in range(len(self.players)) if self._can_act(index)]) def _is_in_hand(self, index: int) -> bool: return self.players[index].in_hand def _is_live(self, player: PlayerState) -> bool: return player.in_hand and not player.folded def _can_act(self, index: int) -> bool: player = self.players[index] return self._is_live(player) and not player.all_in and player.stack > 0 def _next_index(self, start: int, predicate) -> int | None: player_count = len(self.players) for offset in range(player_count): index = (start + offset) % player_count if predicate(index): return index return None def _ordered_indexes(self, start: int, predicate) -> list[int]: player_count = len(self.players) indexes = [] for offset in range(player_count): index = (start + offset) % player_count if predicate(index): indexes.append(index) return indexes def _button_order(self, players: list[PlayerState]) -> list[PlayerState]: assert self.button_index is not None order = self._ordered_indexes(self.button_index + 1, lambda _: True) seat_rank = {index: rank for rank, index in enumerate(order)} return sorted(players, key=lambda player: seat_rank[player.seat])