feat: basic function
This commit is contained in:
@@ -0,0 +1,510 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from random import Random
|
||||
from time import time
|
||||
|
||||
from texas_holdem.agents import PokerAgent
|
||||
from texas_holdem.cards import Deck
|
||||
from texas_holdem.evaluator import evaluate
|
||||
from texas_holdem.models import (
|
||||
ActionRecord,
|
||||
HandSummary,
|
||||
Observation,
|
||||
PlayerAction,
|
||||
PlayerState,
|
||||
PotAward,
|
||||
)
|
||||
|
||||
STREETS = ("preflop", "flop", "turn", "river")
|
||||
|
||||
|
||||
class GameComplete(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class TableGame:
|
||||
def __init__(
|
||||
self,
|
||||
game_id: str,
|
||||
player_specs: list[tuple[str, str, PokerAgent]],
|
||||
starting_stack: int,
|
||||
small_blind: int,
|
||||
big_blind: int,
|
||||
rng: Random | None = None,
|
||||
) -> None:
|
||||
if not 2 <= len(player_specs) <= 12:
|
||||
raise ValueError("a game requires 2-12 players")
|
||||
if starting_stack <= 0:
|
||||
raise ValueError("starting_stack must be positive")
|
||||
if small_blind <= 0 or big_blind <= 0 or small_blind > big_blind:
|
||||
raise ValueError("blinds must satisfy 0 < small_blind <= big_blind")
|
||||
|
||||
player_ids = [player_id for player_id, _, _ in player_specs]
|
||||
if len(set(player_ids)) != len(player_ids):
|
||||
raise ValueError("player ids must be unique")
|
||||
|
||||
self.game_id = game_id
|
||||
self.players = [
|
||||
PlayerState(player_id=player_id, name=name, stack=starting_stack, seat=seat)
|
||||
for seat, (player_id, name, _) in enumerate(player_specs)
|
||||
]
|
||||
self.agents = {player_id: agent for player_id, _, agent in player_specs}
|
||||
self.starting_stack = starting_stack
|
||||
self.small_blind = small_blind
|
||||
self.big_blind = big_blind
|
||||
self.rng = rng or Random()
|
||||
self.hand_number = 0
|
||||
self.button_index: int | None = None
|
||||
self.board = []
|
||||
self.action_history: list[ActionRecord] = []
|
||||
self.hand_summaries: list[HandSummary] = []
|
||||
|
||||
@property
|
||||
def is_complete(self) -> bool:
|
||||
return len([player for player in self.players if player.stack > 0]) < 2
|
||||
|
||||
def run_hand(self) -> HandSummary:
|
||||
if self.is_complete:
|
||||
raise GameComplete("game is complete")
|
||||
|
||||
self.hand_number += 1
|
||||
started_at = time()
|
||||
self.board = []
|
||||
self.action_history = []
|
||||
deck = Deck(self.rng)
|
||||
|
||||
for player in self.players:
|
||||
player.reset_for_hand()
|
||||
|
||||
self._advance_button()
|
||||
assert self.button_index is not None
|
||||
|
||||
self._deal_hole_cards(deck)
|
||||
small_blind_index, big_blind_index = self._blind_indexes()
|
||||
self._post_blind(small_blind_index, "small_blind", self.small_blind)
|
||||
self._post_blind(big_blind_index, "big_blind", self.big_blind)
|
||||
|
||||
preflop_start = (
|
||||
small_blind_index
|
||||
if self._active_player_count() == 2
|
||||
else self._next_index(big_blind_index + 1, self._can_act)
|
||||
)
|
||||
self._betting_round("preflop", preflop_start, self.big_blind)
|
||||
|
||||
for street, card_count in (("flop", 3), ("turn", 1), ("river", 1)):
|
||||
if self._contender_count() <= 1:
|
||||
break
|
||||
deck.burn()
|
||||
self.board.extend(deck.draw(card_count))
|
||||
for player in self.players:
|
||||
player.reset_for_street()
|
||||
if self._betting_player_count() >= 2:
|
||||
start_index = self._next_index(self.button_index + 1, self._can_act)
|
||||
self._betting_round(street, start_index, self.big_blind)
|
||||
|
||||
awards = self._award_pots()
|
||||
summary = HandSummary(
|
||||
game_id=self.game_id,
|
||||
hand_number=self.hand_number,
|
||||
button_seat=self.players[self.button_index].seat,
|
||||
board=list(self.board),
|
||||
actions=list(self.action_history),
|
||||
awards=awards,
|
||||
started_at=started_at,
|
||||
finished_at=time(),
|
||||
)
|
||||
self.hand_summaries.append(summary)
|
||||
return summary
|
||||
|
||||
def run_hands(self, max_hands: int, until_one_left: bool = False) -> list[HandSummary]:
|
||||
if max_hands <= 0:
|
||||
raise ValueError("max_hands must be positive")
|
||||
summaries = []
|
||||
for _ in range(max_hands):
|
||||
if self.is_complete:
|
||||
break
|
||||
summaries.append(self.run_hand())
|
||||
if until_one_left and self.is_complete:
|
||||
break
|
||||
return summaries
|
||||
|
||||
def to_dict(self) -> dict[str, object]:
|
||||
return {
|
||||
"game_id": self.game_id,
|
||||
"status": "complete" if self.is_complete else "running",
|
||||
"hand_number": self.hand_number,
|
||||
"button_seat": None
|
||||
if self.button_index is None
|
||||
else self.players[self.button_index].seat,
|
||||
"small_blind": self.small_blind,
|
||||
"big_blind": self.big_blind,
|
||||
"starting_stack": self.starting_stack,
|
||||
"players": [player.public_dict() for player in self.players],
|
||||
"last_hand": self.hand_summaries[-1].to_dict() if self.hand_summaries else None,
|
||||
}
|
||||
|
||||
def _advance_button(self) -> None:
|
||||
if self.button_index is None:
|
||||
self.button_index = self._next_index(0, lambda index: self.players[index].stack > 0)
|
||||
return
|
||||
self.button_index = self._next_index(
|
||||
self.button_index + 1,
|
||||
lambda index: self.players[index].stack > 0,
|
||||
)
|
||||
|
||||
def _blind_indexes(self) -> tuple[int, int]:
|
||||
assert self.button_index is not None
|
||||
if self._active_player_count() == 2:
|
||||
small_blind_index = self.button_index
|
||||
big_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
|
||||
else:
|
||||
small_blind_index = self._next_index(self.button_index + 1, self._is_in_hand)
|
||||
big_blind_index = self._next_index(small_blind_index + 1, self._is_in_hand)
|
||||
assert small_blind_index is not None
|
||||
assert big_blind_index is not None
|
||||
return small_blind_index, big_blind_index
|
||||
|
||||
def _deal_hole_cards(self, deck: Deck) -> None:
|
||||
assert self.button_index is not None
|
||||
deal_order = self._ordered_indexes(self.button_index + 1, self._is_in_hand)
|
||||
for _ in range(2):
|
||||
for index in deal_order:
|
||||
self.players[index].hole_cards.extend(deck.draw())
|
||||
|
||||
def _post_blind(self, player_index: int, action: str, amount: int) -> None:
|
||||
player = self.players[player_index]
|
||||
committed = player.commit(amount)
|
||||
self._record_action(player, "preflop", action, committed)
|
||||
|
||||
def _betting_round(self, street: str, start_index: int | None, opening_min_raise: int) -> None:
|
||||
if start_index is None:
|
||||
return
|
||||
|
||||
current_bet = max((player.street_bet for player in self.players if self._is_live(player)), default=0)
|
||||
min_raise = opening_min_raise
|
||||
pending = {index for index in range(len(self.players)) if self._can_act(index)}
|
||||
call_only: set[int] = set()
|
||||
cursor = start_index
|
||||
|
||||
while pending and self._contender_count() > 1:
|
||||
player_index = self._next_index(cursor, lambda index: index in pending)
|
||||
if player_index is None:
|
||||
break
|
||||
|
||||
player = self.players[player_index]
|
||||
observation = self._observation(
|
||||
street,
|
||||
player_index,
|
||||
current_bet,
|
||||
min_raise,
|
||||
can_raise=player_index not in call_only,
|
||||
)
|
||||
action = self._agent_action(player, observation)
|
||||
previous_bet = current_bet
|
||||
|
||||
current_bet, min_raise, full_raise = self._apply_action(
|
||||
street,
|
||||
player,
|
||||
action,
|
||||
current_bet,
|
||||
min_raise,
|
||||
)
|
||||
|
||||
pending.discard(player_index)
|
||||
call_only.discard(player_index)
|
||||
opened_betting = previous_bet == 0 and current_bet > 0
|
||||
if full_raise or opened_betting:
|
||||
pending = {
|
||||
index
|
||||
for index in range(len(self.players))
|
||||
if index != player_index and self._can_act(index)
|
||||
}
|
||||
call_only.clear()
|
||||
elif current_bet > previous_bet:
|
||||
owing_players = {
|
||||
index
|
||||
for index in range(len(self.players))
|
||||
if index != player_index
|
||||
and self._can_act(index)
|
||||
and self.players[index].street_bet < current_bet
|
||||
}
|
||||
call_only.update(owing_players - pending)
|
||||
pending.update(owing_players)
|
||||
|
||||
pending = {index for index in pending if self._can_act(index)}
|
||||
call_only = {index for index in call_only if index in pending}
|
||||
cursor = player_index + 1
|
||||
|
||||
def _observation(
|
||||
self,
|
||||
street: str,
|
||||
player_index: int,
|
||||
current_bet: int,
|
||||
min_raise: int,
|
||||
can_raise: bool = True,
|
||||
) -> Observation:
|
||||
player = self.players[player_index]
|
||||
legal_actions = self._legal_actions(player, current_bet, min_raise, can_raise)
|
||||
min_raise_to = next(
|
||||
(
|
||||
int(action["min_amount"])
|
||||
for action in legal_actions
|
||||
if action["action"] in {"bet", "raise"}
|
||||
),
|
||||
None,
|
||||
)
|
||||
assert self.button_index is not None
|
||||
return Observation(
|
||||
game_id=self.game_id,
|
||||
hand_number=self.hand_number,
|
||||
street=street,
|
||||
player_id=player.player_id,
|
||||
seat=player.seat,
|
||||
button_seat=self.players[self.button_index].seat,
|
||||
small_blind=self.small_blind,
|
||||
big_blind=self.big_blind,
|
||||
board=list(self.board),
|
||||
hole_cards=list(player.hole_cards),
|
||||
players=[other.public_dict() for other in self.players],
|
||||
pot=sum(other.total_bet for other in self.players),
|
||||
to_call=max(0, current_bet - player.street_bet),
|
||||
min_raise_to=min_raise_to,
|
||||
legal_actions=legal_actions,
|
||||
action_history=list(self.action_history),
|
||||
)
|
||||
|
||||
def _legal_actions(
|
||||
self,
|
||||
player: PlayerState,
|
||||
current_bet: int,
|
||||
min_raise: int,
|
||||
can_raise: bool = True,
|
||||
) -> list[dict[str, object]]:
|
||||
to_call = max(0, current_bet - player.street_bet)
|
||||
max_target = player.street_bet + player.stack
|
||||
actions: list[dict[str, object]] = []
|
||||
|
||||
if to_call > 0:
|
||||
actions.append({"action": "fold", "amount": 0})
|
||||
actions.append({"action": "call", "amount": min(to_call, player.stack)})
|
||||
if not can_raise:
|
||||
return actions
|
||||
min_raise_to = current_bet + min_raise
|
||||
if max_target >= min_raise_to:
|
||||
actions.append(
|
||||
{
|
||||
"action": "raise",
|
||||
"min_amount": min_raise_to,
|
||||
"max_amount": max_target,
|
||||
"amount_mode": "street_total",
|
||||
}
|
||||
)
|
||||
elif max_target > current_bet:
|
||||
actions.append({"action": "all_in", "amount": max_target})
|
||||
return actions
|
||||
|
||||
actions.append({"action": "check", "amount": 0})
|
||||
if player.stack <= 0:
|
||||
return actions
|
||||
|
||||
if current_bet == 0:
|
||||
if max_target >= self.big_blind:
|
||||
actions.append(
|
||||
{
|
||||
"action": "bet",
|
||||
"min_amount": self.big_blind,
|
||||
"max_amount": max_target,
|
||||
"amount_mode": "street_total",
|
||||
}
|
||||
)
|
||||
else:
|
||||
actions.append({"action": "all_in", "amount": max_target})
|
||||
else:
|
||||
min_raise_to = current_bet + min_raise
|
||||
if max_target >= min_raise_to:
|
||||
actions.append(
|
||||
{
|
||||
"action": "raise",
|
||||
"min_amount": min_raise_to,
|
||||
"max_amount": max_target,
|
||||
"amount_mode": "street_total",
|
||||
}
|
||||
)
|
||||
elif max_target > current_bet:
|
||||
actions.append({"action": "all_in", "amount": max_target})
|
||||
|
||||
return actions
|
||||
|
||||
def _agent_action(self, player: PlayerState, observation: Observation) -> PlayerAction:
|
||||
agent = self.agents[player.player_id]
|
||||
try:
|
||||
requested = agent.decide(observation)
|
||||
except Exception:
|
||||
requested = PlayerAction("fold")
|
||||
return self._coerce_action(requested, observation.legal_actions)
|
||||
|
||||
def _coerce_action(
|
||||
self,
|
||||
requested: PlayerAction,
|
||||
legal_actions: list[dict[str, object]],
|
||||
) -> PlayerAction:
|
||||
by_action = {str(action["action"]): action for action in legal_actions}
|
||||
requested_type = requested.action.lower()
|
||||
|
||||
if requested_type in {"fold", "check", "call", "all_in"} and requested_type in by_action:
|
||||
legal = by_action[requested_type]
|
||||
return PlayerAction(requested_type, int(legal.get("amount") or 0))
|
||||
|
||||
if requested_type in {"bet", "raise"} and requested_type in by_action:
|
||||
legal = by_action[requested_type]
|
||||
min_amount = int(legal["min_amount"])
|
||||
max_amount = int(legal["max_amount"])
|
||||
amount = min(max(requested.amount, min_amount), max_amount)
|
||||
return PlayerAction(requested_type, amount)
|
||||
|
||||
for fallback in ("check", "call", "fold"):
|
||||
if fallback in by_action:
|
||||
legal = by_action[fallback]
|
||||
return PlayerAction(fallback, int(legal.get("amount") or 0))
|
||||
|
||||
legal = legal_actions[0]
|
||||
return PlayerAction(str(legal["action"]), int(legal.get("amount") or 0))
|
||||
|
||||
def _apply_action(
|
||||
self,
|
||||
street: str,
|
||||
player: PlayerState,
|
||||
action: PlayerAction,
|
||||
current_bet: int,
|
||||
min_raise: int,
|
||||
) -> tuple[int, int, bool]:
|
||||
previous_bet = current_bet
|
||||
committed = 0
|
||||
full_raise = False
|
||||
|
||||
if action.action == "fold":
|
||||
player.folded = True
|
||||
elif action.action == "check":
|
||||
pass
|
||||
elif action.action == "call":
|
||||
committed = player.commit(current_bet - player.street_bet)
|
||||
elif action.action in {"bet", "raise", "all_in"}:
|
||||
target = action.amount
|
||||
committed = player.commit(target - player.street_bet)
|
||||
current_bet = max(current_bet, player.street_bet)
|
||||
raise_size = current_bet - previous_bet
|
||||
if raise_size >= min_raise:
|
||||
full_raise = True
|
||||
min_raise = raise_size
|
||||
else:
|
||||
raise ValueError(f"unsupported action: {action.action}")
|
||||
|
||||
self._record_action(player, street, action.action, committed)
|
||||
return current_bet, min_raise, full_raise
|
||||
|
||||
def _award_pots(self) -> list[PotAward]:
|
||||
total_pot = sum(player.total_bet for player in self.players)
|
||||
live_players = [player for player in self.players if self._is_live(player)]
|
||||
if not live_players or total_pot <= 0:
|
||||
return []
|
||||
|
||||
if len(live_players) == 1:
|
||||
live_players[0].stack += total_pot
|
||||
return [PotAward(total_pot, [live_players[0].player_id], None)]
|
||||
|
||||
levels = sorted({player.total_bet for player in self.players if player.total_bet > 0})
|
||||
previous_level = 0
|
||||
awards: list[PotAward] = []
|
||||
for level in levels:
|
||||
contributors = [player for player in self.players if player.total_bet >= level]
|
||||
pot_amount = (level - previous_level) * len(contributors)
|
||||
previous_level = level
|
||||
contenders = [player for player in contributors if self._is_live(player)]
|
||||
if not contenders or pot_amount <= 0:
|
||||
continue
|
||||
|
||||
values = {
|
||||
player.player_id: evaluate([*player.hole_cards, *self.board])
|
||||
for player in contenders
|
||||
}
|
||||
best_value = max(values.values())
|
||||
winners = [
|
||||
player
|
||||
for player in contenders
|
||||
if values[player.player_id] == best_value
|
||||
]
|
||||
ordered_winners = self._button_order(winners)
|
||||
share, remainder = divmod(pot_amount, len(ordered_winners))
|
||||
for winner in ordered_winners:
|
||||
winner.stack += share
|
||||
for winner in ordered_winners[:remainder]:
|
||||
winner.stack += 1
|
||||
awards.append(
|
||||
PotAward(
|
||||
amount=pot_amount,
|
||||
winners=[winner.player_id for winner in ordered_winners],
|
||||
hand_value=best_value,
|
||||
)
|
||||
)
|
||||
return awards
|
||||
|
||||
def _record_action(
|
||||
self,
|
||||
player: PlayerState,
|
||||
street: str,
|
||||
action: str,
|
||||
committed: int,
|
||||
) -> None:
|
||||
self.action_history.append(
|
||||
ActionRecord(
|
||||
hand_number=self.hand_number,
|
||||
street=street,
|
||||
player_id=player.player_id,
|
||||
action=action,
|
||||
amount=committed,
|
||||
street_bet=player.street_bet,
|
||||
stack=player.stack,
|
||||
)
|
||||
)
|
||||
|
||||
def _active_player_count(self) -> int:
|
||||
return len([player for player in self.players if player.stack > 0 or player.in_hand])
|
||||
|
||||
def _contender_count(self) -> int:
|
||||
return len([player for player in self.players if self._is_live(player)])
|
||||
|
||||
def _betting_player_count(self) -> int:
|
||||
return len([index for index in range(len(self.players)) if self._can_act(index)])
|
||||
|
||||
def _is_in_hand(self, index: int) -> bool:
|
||||
return self.players[index].in_hand
|
||||
|
||||
def _is_live(self, player: PlayerState) -> bool:
|
||||
return player.in_hand and not player.folded
|
||||
|
||||
def _can_act(self, index: int) -> bool:
|
||||
player = self.players[index]
|
||||
return self._is_live(player) and not player.all_in and player.stack > 0
|
||||
|
||||
def _next_index(self, start: int, predicate) -> int | None:
|
||||
player_count = len(self.players)
|
||||
for offset in range(player_count):
|
||||
index = (start + offset) % player_count
|
||||
if predicate(index):
|
||||
return index
|
||||
return None
|
||||
|
||||
def _ordered_indexes(self, start: int, predicate) -> list[int]:
|
||||
player_count = len(self.players)
|
||||
indexes = []
|
||||
for offset in range(player_count):
|
||||
index = (start + offset) % player_count
|
||||
if predicate(index):
|
||||
indexes.append(index)
|
||||
return indexes
|
||||
|
||||
def _button_order(self, players: list[PlayerState]) -> list[PlayerState]:
|
||||
assert self.button_index is not None
|
||||
order = self._ordered_indexes(self.button_index + 1, lambda _: True)
|
||||
seat_rank = {index: rank for rank, index in enumerate(order)}
|
||||
return sorted(players, key=lambda player: seat_rank[player.seat])
|
||||
Reference in New Issue
Block a user