holy-heck-i-really-like-stats/main.py

518 lines
16 KiB
Python
Raw Normal View History

2023-03-26 18:47:11 +11:00
#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
2023-03-28 00:00:15 +11:00
from functools import partial, partialmethod
2023-03-26 18:47:11 +11:00
from pathlib import Path
import argparse
import json
import logging
2023-04-03 21:55:15 +10:00
import re
2023-03-26 18:47:11 +11:00
import requests
import sqlite3
import sys
import typing as t
2023-03-28 00:00:15 +11:00
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
2023-03-27 23:07:10 +11:00
class LogFormatter(logging.Formatter):
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:10 +11:00
format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
2023-03-28 00:00:15 +11:00
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
2023-03-27 23:07:10 +11:00
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
}
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:10 +11:00
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:10 +11:00
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
2023-03-26 18:47:11 +11:00
2023-04-02 17:26:01 +10:00
TEAMS = {}
_logged_teams = []
def team(player: str) -> str:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
2023-04-02 17:26:01 +10:00
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
2023-03-26 18:47:11 +11:00
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, pokemon, move, target,
UNIQUE(game, turn, player, pokemon)
2023-03-26 18:47:11 +11:00
);
CREATE TABLE IF NOT EXISTS switches(
game, turn, player, pokemon,
UNIQUE(game, turn, player, pokemon)
2023-03-26 18:47:11 +11:00
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, pokemon, specie,
2023-03-27 20:17:18 +11:00
UNIQUE(game, player, specie)
2023-03-26 18:47:11 +11:00
);
2023-03-27 20:17:46 +11:00
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, pokemon,
2023-03-27 20:17:46 +11:00
UNIQUE(game, turn, player)
);
2023-03-27 23:07:38 +11:00
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player,
2023-03-27 23:07:38 +11:00
UNIQUE(game, turn, player)
);
2023-03-27 20:17:46 +11:00
CREATE TABLE IF NOT EXISTS games(
2023-03-28 23:06:46 +11:00
id, p1, p2, format, uploadtime, winner,
2023-03-27 20:17:46 +11:00
UNIQUE(id)
2023-04-03 21:55:15 +10:00
);
-- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, pokemon, value);
2023-04-03 21:55:15 +10:00
DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, pokemon, value);
2023-03-26 18:47:11 +11:00
"""
)
def parse_log(game: str, log: str, into: sqlite3.Connection):
conn = into
turn = 0
players = {}
2023-04-03 21:55:15 +10:00
hp = {}
2023-03-26 18:47:11 +11:00
2023-03-27 23:07:38 +11:00
# ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status,
# and its target
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[str, str], str] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {}
def split_pokemon(user: str) -> tuple[str, str]:
"""Splits a Pokemon identifier of the form `pXa: Pokemon` into the
player's name (as marked by the player log) and "Pokemon".
Note that all Pokemon are referred to by their nicknames, and will
require resolving to obtain the Pokemon specie."""
2023-03-26 18:47:11 +11:00
[player, name] = user.split(": ")
return players[player.strip("ab")], name
def specie_from_parts(player: str, nickname: str) -> str:
"""Resolves the species of a nicknamed Pokemon."""
return (
conn.execute(
"""
SELECT specie
FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1
""",
(game, team(player), nickname),
)
.fetchall()[0]
.specie
)
def specie(pokemon: str) -> str:
"""Resolves the species of a Pokemon given its Showdown identifier (used
in split_pokemon)."""
return specie_from_parts(*split_pokemon(pokemon))
2023-03-26 18:47:11 +11:00
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
2023-03-28 00:00:15 +11:00
LOG.trace(line)
2023-03-26 18:47:11 +11:00
match chunks:
case ["player", id, username, *rest]:
players[id] = username
2023-04-03 20:23:17 +10:00
2023-03-26 18:47:11 +11:00
case ["turn", turn]:
turn = int(turn)
2023-04-03 20:23:17 +10:00
2023-03-26 18:47:11 +11:00
case ["move", user, move, target]:
2023-03-27 23:07:38 +11:00
last_move = (user, target)
player, _ = split_pokemon(user)
2023-03-26 18:47:11 +11:00
conn.execute(
"""
INSERT INTO moves(game, turn, player, pokemon, move, target)
2023-03-26 18:47:11 +11:00
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
turn,
team(player),
specie(user),
move,
specie(target),
),
2023-03-26 18:47:11 +11:00
)
2023-04-03 20:23:17 +10:00
case ["drag", name, specie_, status, *rest]:
2023-04-03 21:55:15 +10:00
hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie_.split(", ")[0]
player, nickname = split_pokemon(name)
conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
)
case ["replace", name, specie_]:
# Also includes gender and formes.
trimmed_specie = specie_.split(", ")[0]
player, nickname = split_pokemon(name)
conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
)
case ["switch", name, specie_, status, *rest]:
2023-04-03 21:55:15 +10:00
hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie_.split(", ")[0]
player, nickname = split_pokemon(name)
2023-03-26 18:47:11 +11:00
conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
2023-03-27 20:19:58 +11:00
VALUES (?, ?, ?, ?)
2023-03-26 18:47:11 +11:00
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), trimmed_specie),
2023-03-26 18:47:11 +11:00
)
conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
2023-03-26 18:47:11 +11:00
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
2023-03-26 18:47:11 +11:00
)
2023-04-03 20:23:17 +10:00
case ["faint", pokemon]:
2023-03-27 21:51:50 +11:00
conn.execute(
"""
INSERT INTO knockouts(game, turn, player, pokemon)
2023-03-27 21:51:50 +11:00
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), specie(pokemon)),
2023-03-27 21:51:50 +11:00
)
2023-04-03 20:23:17 +10:00
2023-03-28 23:06:46 +11:00
case ["win", player]:
conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
2023-04-02 17:26:01 +10:00
(team(player), game),
2023-03-28 23:06:46 +11:00
)
2023-04-03 20:23:17 +10:00
2023-03-27 23:07:38 +11:00
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
2023-03-27 23:07:38 +11:00
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
2023-04-03 20:23:17 +10:00
2023-03-27 23:07:38 +11:00
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
2023-03-27 23:07:38 +11:00
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
2023-04-03 20:23:17 +10:00
case ["-damage", pokemon, status]:
2023-04-03 21:55:15 +10:00
# mon takes direct (non-hazard/condition) damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {hp[pokemon]}")
2023-04-03 21:55:15 +10:00
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
f"{pokemon} took direct damage but last move was not"
" targeted at them"
2023-04-03 21:55:15 +10:00
)
continue
damage_source = last_move[0]
source_player, source_nickname = split_pokemon(damage_source)
2023-04-03 21:55:15 +10:00
conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
2023-04-03 21:55:15 +10:00
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
team(source_player),
specie(damage_source),
hp[pokemon] - new_hp,
),
2023-04-03 21:55:15 +10:00
)
hp[pokemon] = new_hp
2023-04-03 21:55:15 +10:00
case ["-damage", pokemon, status, from_]:
2023-04-03 21:55:15 +10:00
# mon takes indirect damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
2023-04-03 21:55:15 +10:00
source = None
source_is_pokemon = True
test_hazard = last_env_set.get((pokemon[0:1], reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = split_pokemon(source)
else:
source_player = None
source_is_pokemon = False
2023-04-03 21:55:15 +10:00
if source_player:
conn.execute(
"""
INSERT INTO indirect_damage(game, player, pokemon, value)
2023-04-03 21:55:15 +10:00
VALUES(?, ?, ?, ?)
""",
(
game,
team(source_player),
specie(source),
hp[pokemon] - new_hp,
),
2023-04-03 21:55:15 +10:00
)
if status == "0 fnt":
conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
2023-04-03 21:55:15 +10:00
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
turn,
team(player),
specie(pokemon),
reason,
specie(source) if source_is_pokemon else source,
2023-04-03 21:55:15 +10:00
team(source_player),
),
)
case ["-heal", pokemon, status, *rest]:
hp[pokemon] = int(status.split("/")[0])
2023-04-03 20:23:17 +10:00
2023-03-27 21:51:50 +11:00
case _:
2023-03-27 23:07:10 +11:00
# LOG.debug(f"unhandled message {chunks[0]}")
pass
2023-03-26 18:47:11 +11:00
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
2023-03-28 00:19:54 +11:00
def fetch(replay: str, cache: bool = True) -> Replay:
2023-03-26 18:47:11 +11:00
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data)
2023-03-28 00:26:51 +11:00
def main(args):
2023-03-26 18:47:11 +11:00
parser = argparse.ArgumentParser(
2023-03-28 00:24:21 +11:00
prog=APP,
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
2023-03-26 18:47:11 +11:00
)
2023-03-28 00:00:15 +11:00
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
2023-03-28 00:19:54 +11:00
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
)
2023-04-02 17:26:01 +10:00
parser.add_argument(
"-t",
"--teams",
action="store",
metavar="FILE",
default="teams.json",
help="JSON file defining players to teams",
)
2023-03-28 00:24:21 +11:00
parser.add_argument(
"-o",
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
)
2023-03-27 21:51:31 +11:00
parser.add_argument("replay", nargs="+", help="replay ID or URL")
2023-03-26 18:47:11 +11:00
2023-03-28 00:26:51 +11:00
args = parser.parse_args(args)
2023-03-28 00:12:25 +11:00
if args.verbose and args.verbose > 1:
2023-03-28 00:00:15 +11:00
LOG.setLevel(logging.TRACE)
2023-03-28 00:12:25 +11:00
elif args.verbose:
2023-03-28 00:00:15 +11:00
LOG.setLevel(logging.DEBUG)
2023-03-26 18:47:11 +11:00
2023-04-02 17:26:01 +10:00
if args.teams:
with open(args.teams) as f:
global TEAMS
TEAMS = json.load(f)
2023-03-26 18:47:11 +11:00
try:
2023-03-28 00:24:21 +11:00
db = sqlite3.connect(args.output)
2023-03-26 18:47:11 +11:00
_init_db(db)
2023-03-27 21:51:31 +11:00
for r in args.replay:
try:
2023-03-28 00:19:54 +11:00
replay = fetch(r, cache=not args.no_cache)
2023-03-27 21:51:31 +11:00
except Exception as e:
2023-03-27 23:07:10 +11:00
LOG.error(f"bad replay {r}")
2023-03-27 21:51:31 +11:00
continue
2023-03-27 20:17:18 +11:00
2023-03-27 23:07:10 +11:00
LOG.info(f"indexing game {replay.id}")
2023-03-27 21:51:31 +11:00
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
2023-04-02 17:26:01 +10:00
(
replay.id,
team(replay.p1),
team(replay.p2),
replay.format,
replay.uploadtime,
),
2023-03-27 21:51:31 +11:00
)
parse_log(replay.id, replay.log, into=db)
db.commit()
2023-03-26 18:47:11 +11:00
finally:
db.close()
if __name__ == "__main__":
2023-03-28 00:26:51 +11:00
main(sys.argv[1:])