holy-heck-i-really-like-stats/main.py

566 lines
19 KiB
Python
Raw Normal View History

2023-03-26 18:47:11 +11:00
#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
2023-03-28 00:00:15 +11:00
from functools import partial, partialmethod
2023-03-26 18:47:11 +11:00
from pathlib import Path
import argparse
import json
import logging
2023-04-03 21:55:15 +10:00
import re
2023-03-26 18:47:11 +11:00
import requests
import sqlite3
import sys
import typing as t
logging.TRACE = 5 # type: ignore
logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
logging.trace = partial(logging.log, logging.TRACE) # type: ignore
2023-03-28 00:00:15 +11:00
2023-03-27 23:07:10 +11:00
class LogFormatter(logging.Formatter):
2023-03-26 18:47:11 +11:00
_format = "%(name)s [%(levelname)s] %(message)s"
2023-03-27 23:07:10 +11:00
FORMATS = {
logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
2023-03-27 23:07:10 +11:00
}
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:10 +11:00
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:10 +11:00
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
2023-03-26 18:47:11 +11:00
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, pokemon, move, target,
UNIQUE(game, turn, player, pokemon)
2023-03-26 18:47:11 +11:00
);
CREATE TABLE IF NOT EXISTS switches(
game, turn, player, pokemon,
UNIQUE(game, turn, player, pokemon)
2023-03-26 18:47:11 +11:00
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, pokemon, specie,
2023-03-27 20:17:18 +11:00
UNIQUE(game, player, specie)
2023-03-26 18:47:11 +11:00
);
2023-03-27 20:17:46 +11:00
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, pokemon,
2023-03-27 20:17:46 +11:00
UNIQUE(game, turn, player)
);
2023-03-27 23:07:38 +11:00
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player,
2023-03-27 23:07:38 +11:00
UNIQUE(game, turn, player)
);
2023-03-27 20:17:46 +11:00
CREATE TABLE IF NOT EXISTS games(
2023-03-28 23:06:46 +11:00
id, p1, p2, format, uploadtime, winner,
2023-03-27 20:17:46 +11:00
UNIQUE(id)
2023-04-03 21:55:15 +10:00
);
-- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, pokemon, value);
2023-04-03 21:55:15 +10:00
DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, pokemon, value);
2023-03-26 18:47:11 +11:00
"""
)
# Either the value "p1" or "p2"
PlayerTag = t.NewType("PlayerTag", str)
# A player's name
Player = t.NewType("Player", str)
# A player prefixed with a PlayerTag
TaggedPlayer = t.NewType("TaggedPlayer", str)
# A Pokemon identified by its nickname, if any
Pokemon = t.NewType("Pokemon", str)
# A Pokemon specie
PokemonSpecie = t.NewType("PokemonSpecie", str)
# A Pokemon prefixed with a PlayerTag
TaggedPokemon = t.NewType("TaggedPokemon", str)
2023-04-04 22:39:02 +10:00
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
return PlayerTag(tagged[0:1])
TEAMS: dict[Player, Player] = {}
_logged_teams: list[Player] = []
def team(player: Player) -> Player:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class LogParser:
2023-03-26 18:47:11 +11:00
turn = 0
players: dict[PlayerTag, Player] = {}
hp: dict[TaggedPokemon, int] = {}
2023-03-26 18:47:11 +11:00
2023-04-04 22:39:02 +10:00
# Memorises the user of the move that causes environment setting or status,
# its target, and the move name (for debugging).
last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None
2023-03-27 23:07:38 +11:00
2023-04-04 22:39:02 +10:00
# Memorises the last hazard set against a player and the causing user.
last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {}
2023-03-27 23:07:38 +11:00
2023-04-04 22:39:02 +10:00
# Memorises statuses set on a pokemon and the causing user.
last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {}
2023-03-27 23:07:38 +11:00
def __init__(self, game: str, into: sqlite3.Connection):
self.game = game
self.conn: sqlite3.Connection = into
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
"""Splits a TaggedPokemon into the owning player and the Pokemon."""
[player, pokemon] = user.split(": ")
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
2023-03-26 18:47:11 +11:00
@t.overload
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
"""Resolves the species of a nicknamed Pokemon."""
...
@t.overload
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
"""Resolves the species of a Pokemon given its Showdown identifier (used
in split_pokemon)."""
...
def specie(
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
) -> PokemonSpecie:
if not player:
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
return (
self.conn.execute(
"""
SELECT specie
FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1
""",
(self.game, team(player), pokemon),
)
.fetchall()[0]
.specie
)
def _reset(self):
self.turn = 0
self.players.clear()
2023-04-04 22:40:45 +10:00
self.hp.clear()
self.last_move = None
self.last_env_set.clear()
self.last_status_set.clear()
def _log_appearance(self, name: TaggedPokemon, specie: str):
# Also includes gender and formes.
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
player, nickname = self.split_pokemon(name)
self.conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, team(player), nickname, trimmed_specie),
)
def parse(self, log: str):
self._reset()
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
LOG.trace(line) # type: ignore
match chunks:
2023-04-04 22:32:47 +10:00
# t.Literal, TaggedPokemon, str, str
case ["drag", name_, specie, status, *rest]:
name = TaggedPokemon(name_)
self.hp[name] = int(status.split("/")[0])
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon
case ["faint", pokemon_]:
pokemon = TaggedPokemon(pokemon_)
player, _ = self.split_pokemon(pokemon)
self.conn.execute(
"""
INSERT INTO knockouts(game, turn, player, pokemon)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), self.specie(pokemon)),
)
# t.Literal, TaggedPokemon, str, TaggedPokemon
case ["move", user_, move, target_]:
user = TaggedPokemon(user_)
target = TaggedPokemon(target_)
2023-04-04 22:39:02 +10:00
last_move = (user, target, move)
player, _ = self.split_pokemon(user)
self.conn.execute(
2023-04-03 21:55:15 +10:00
"""
INSERT INTO moves(game, turn, player, pokemon, move, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
2023-04-03 21:55:15 +10:00
""",
(
self.game,
self.turn,
team(player),
self.specie(user),
move,
self.specie(target),
),
2023-04-03 21:55:15 +10:00
)
2023-04-04 22:32:47 +10:00
# t.Literal, PlayerTag, Player
case ["player", id, username, *rest]:
self.players[PlayerTag(id)] = Player(username)
# t.Literal, TaggedPokemon, str
case ["replace", name, specie]:
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
case ["switch", name, specie, status, *rest]:
self.hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie.split(", ")[0]
player, nickname = self.split_pokemon(name)
self._log_appearance(name, specie)
self.conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), trimmed_specie),
)
2023-04-04 22:32:47 +10:00
# t.Literal, str
case ["turn", turn]:
self.turn = int(turn)
# t.Literal, Player
case ["win", player]:
self.conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), self.game),
)
2023-04-04 22:32:47 +10:00
case ["-heal", pokemon, status, *rest]:
self.hp[pokemon] = int(status.split("/")[0])
# t.Literal, TaggedPokemon, str
case ["-damage", pokemon, status]:
# Pokemon takes direct (non-hazard/condition) damage; status
# can be a percentage "70/100" with or without condition, or
# "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
2023-04-04 22:39:02 +10:00
f"{pokemon} took direct damage but last move"
f" {last_move[2]} was not targeted at them"
)
continue
damage_source = last_move[0]
source_player, source_nickname = self.split_pokemon(damage_source)
self.conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
2023-04-03 21:55:15 +10:00
ON CONFLICT DO NOTHING
""",
(
self.game,
2023-04-03 21:55:15 +10:00
team(source_player),
self.specie(damage_source),
self.hp[pokemon] - new_hp,
2023-04-03 21:55:15 +10:00
),
)
self.hp[pokemon] = new_hp
# t.Literal, TaggedPokemon, str, str
case ["-damage", pokemon_, status, from_]:
pokemon = TaggedPokemon(pokemon_)
# Pokemon takes indirect damage; status can be a percentage
# "70/100" with or without condition, or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
source: TaggedPokemon | str | None = None
source_is_pokemon = True
2023-04-04 22:39:02 +10:00
test_hazard = self.last_env_set.get((tag(pokemon), reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = self.last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = self.split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = self.split_pokemon(TaggedPokemon(source))
else:
source_player = None # type: ignore
source_is_pokemon = False
if source_player:
self.conn.execute(
"""
INSERT INTO indirect_damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
""",
(
self.game,
team(source_player),
self.specie(TaggedPokemon(source)),
self.hp[pokemon] - new_hp,
),
)
if status == "0 fnt":
self.conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
self.turn,
team(player),
self.specie(pokemon),
reason,
self.specie(TaggedPokemon(source))
if source_is_pokemon
else source,
team(source_player),
),
)
2023-04-04 22:32:47 +10:00
# t.Literal, TaggedPlayer, str
2023-04-04 22:39:02 +10:00
case ["-sidestart", side_, env]:
side = TaggedPlayer(side_)
2023-04-04 22:32:47 +10:00
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_env_set[
2023-04-04 22:39:02 +10:00
(tag(side), env.replace("move: ", ""))
2023-04-04 22:32:47 +10:00
] = last_move[0]
# t.Literal, TaggedPokemon, str
2023-04-04 22:39:02 +10:00
case ["-status", pokemon_, cond]:
pokemon = TaggedPokemon(pokemon_)
if not last_move or last_move[1] != pokemon:
2023-04-04 22:32:47 +10:00
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
2023-04-04 22:39:02 +10:00
self.last_status_set[(pokemon, cond)] = last_move[0]
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
2023-03-26 18:47:11 +11:00
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
2023-03-28 00:19:54 +11:00
def fetch(replay: str, cache: bool = True) -> Replay:
2023-03-26 18:47:11 +11:00
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data) # type: ignore
2023-03-26 18:47:11 +11:00
2023-03-28 00:26:51 +11:00
def main(args):
2023-03-26 18:47:11 +11:00
parser = argparse.ArgumentParser(
2023-03-28 00:24:21 +11:00
prog=APP,
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
2023-03-26 18:47:11 +11:00
)
2023-03-28 00:00:15 +11:00
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
2023-03-28 00:19:54 +11:00
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
)
2023-04-02 17:26:01 +10:00
parser.add_argument(
"-t",
"--teams",
action="store",
metavar="FILE",
default="teams.json",
help="JSON file defining players to teams",
)
2023-03-28 00:24:21 +11:00
parser.add_argument(
"-o",
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
)
2023-03-27 21:51:31 +11:00
parser.add_argument("replay", nargs="+", help="replay ID or URL")
2023-03-26 18:47:11 +11:00
2023-03-28 00:26:51 +11:00
args = parser.parse_args(args)
2023-03-28 00:12:25 +11:00
if args.verbose and args.verbose > 1:
2023-03-28 00:00:15 +11:00
LOG.setLevel(logging.TRACE)
2023-03-28 00:12:25 +11:00
elif args.verbose:
2023-03-28 00:00:15 +11:00
LOG.setLevel(logging.DEBUG)
2023-03-26 18:47:11 +11:00
2023-04-02 17:26:01 +10:00
if args.teams:
with open(args.teams) as f:
global TEAMS
TEAMS = json.load(f)
2023-03-26 18:47:11 +11:00
try:
2023-03-28 00:24:21 +11:00
db = sqlite3.connect(args.output)
2023-03-26 18:47:11 +11:00
_init_db(db)
2023-03-27 21:51:31 +11:00
for r in args.replay:
try:
2023-03-28 00:19:54 +11:00
replay = fetch(r, cache=not args.no_cache)
2023-03-27 21:51:31 +11:00
except Exception as e:
2023-03-27 23:07:10 +11:00
LOG.error(f"bad replay {r}")
2023-03-27 21:51:31 +11:00
continue
2023-03-27 20:17:18 +11:00
2023-03-27 23:07:10 +11:00
LOG.info(f"indexing game {replay.id}")
2023-03-27 21:51:31 +11:00
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
2023-04-02 17:26:01 +10:00
(
replay.id,
team(replay.p1),
team(replay.p2),
replay.format,
replay.uploadtime,
),
2023-03-27 21:51:31 +11:00
)
LogParser(replay.id, db).parse(replay.log)
2023-03-27 21:51:31 +11:00
db.commit()
2023-03-26 18:47:11 +11:00
finally:
db.close()
if __name__ == "__main__":
2023-03-28 00:26:51 +11:00
main(sys.argv[1:])