holy-heck-i-really-like-stats/main.py
2023-03-28 00:00:15 +11:00

300 lines
9.4 KiB
Python
Executable File

#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
from functools import partial, partialmethod
from pathlib import Path
import argparse
import json
import logging
import requests
import sqlite3
import sys
import typing as t
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, name, user, target,
UNIQUE(game, turn, player, user)
);
CREATE TABLE IF NOT EXISTS switches(
game, turn, player, name,
UNIQUE(game, turn, player, name)
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, name, specie,
UNIQUE(game, player, specie)
);
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, name,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, name, source, source_user,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime,
UNIQUE(id)
)
"""
)
def parse_log(game: str, log: str, into: sqlite3.Connection):
conn = into
turn = 0
players = {}
# ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status,
# and its target
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[str, str], str] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {}
def resolve_mon(user: str) -> tuple[str, str]:
[player, name] = user.split(": ")
return players[player.strip("ab")], name
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
LOG.trace(line)
match chunks:
case ["player", id, username, *rest]:
players[id] = username
case ["turn", turn]:
turn = int(turn)
case ["move", user, move, target]:
last_move = (user, target)
player, user = resolve_mon(user)
_, target = resolve_mon(target)
conn.execute(
"""
INSERT INTO moves(game, turn, player, name, user, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, move, user, target),
)
case ["switch", name, specie, *rest]:
player, name = resolve_mon(name)
conn.execute(
"""
INSERT INTO switches(game, turn, player, name)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, name),
)
conn.execute(
"""
INSERT INTO nicknames(game, player, name, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, player, name, specie.split(", ")[0]),
)
case ["faint", mon]:
player, mon = resolve_mon(mon)
conn.execute(
"""
INSERT INTO knockouts(game, turn, player, name)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, mon),
)
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", mon, *rest]:
# rest is new_hp and sometimes a source (if not from a move)
# in a knockout, new_hp is "0 fnt"
if rest[0] == "0 fnt" and len(rest) > 1:
LOG.debug(f"tracing source for {line}")
source = rest[1].replace("[from] ", "")
source_user = None
if source == "Recoil" or source.startswith("item: "):
source_user = source.replace("item: ", "")
if source_user:
LOG.debug(f"identified special source {source_user}")
else:
source_user = last_env_set.get((mon[0:1], source))
if source_user:
LOG.debug(f"identified hazard source {source_user}")
else:
source_user = last_status_set.get((mon, source))
if source_user:
LOG.debug(f"identified move source {source_user}")
else:
LOG.error(f"missing source for {line}")
continue
conn.execute(
"""
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user)
VALUES(?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, mon, source, source_user),
)
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
def fetch(replay: str, cache: bool = False) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data)
def main():
parser = argparse.ArgumentParser(
prog=APP, description="extracts stats from a Showdown replay"
)
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
parser.add_argument("replay", nargs="+", help="replay ID or URL")
args = parser.parse_args(sys.argv[1:])
if args.verbose > 1:
LOG.setLevel(logging.TRACE)
elif args.verbose > 0:
LOG.setLevel(logging.DEBUG)
try:
db = sqlite3.connect("data.db")
_init_db(db)
for r in args.replay:
try:
replay = fetch(r, cache=args.cache)
except Exception as e:
LOG.error(f"bad replay {r}")
continue
LOG.info(f"indexing game {replay.id}")
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(replay.id, replay.p1, replay.p2, replay.format, replay.uploadtime),
)
parse_log(replay.id, replay.log, into=db)
db.commit()
finally:
db.close()
if __name__ == "__main__":
main()