#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from pathlib import Path import argparse import json import logging import requests import sqlite3 import sys import typing as t class LogFormatter(logging.Formatter): format = "%(name)s [%(levelname)s] %(message)s" FORMATS = { logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m", logging.INFO: f"\x1b[34;20m{format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m", } def format(self, record): fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(fmt) return formatter.format(record) APP = "hhirlstats" LOG = logging.getLogger(APP) _ch = logging.StreamHandler() _ch.setFormatter(LogFormatter()) LOG.addHandler(_ch) class safelist(list): def get(self, index, default=None): try: return self.__getitem__(index) except IndexError: return default def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) conn.row_factory = namedtuple_factory conn.executescript( """ CREATE TABLE IF NOT EXISTS moves( game, turn, player, name, user, target, UNIQUE(game, turn, player, user) ); CREATE TABLE IF NOT EXISTS switches( game, turn, player, name, UNIQUE(game, turn, player, name) ); CREATE TABLE IF NOT EXISTS nicknames( game, player, name, specie, UNIQUE(game, player, specie) ); CREATE TABLE IF NOT EXISTS knockouts( game, turn, player, name, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS indirect_knockouts( game, turn, player, name, source, source_user, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS games( id, p1, p2, format, uploadtime, UNIQUE(id) ) """ ) def parse_log(game: str, log: str, into: sqlite3.Connection): conn = into turn = 0 players = {} # ("p2a: Edward", "p1a: Meteo") # memorises the user of the move that causes environment setting or status, # and its target last_move: t.Optional[tuple[str, str]] # ("p1", "Spikes") => "p2a: Frosslas" last_env_set: dict[tuple[str, str], str] = {} # ("p1a: Meteo", "brn") => "p2a: Edward" last_status_set: dict[tuple[str, str], str] = {} def resolve_mon(user: str) -> tuple[str, str]: [player, name] = user.split(": ") return players[player.strip("ab")], name for line in log.split("\n"): chunks = line.split("|")[1:] if not chunks: continue match chunks: case ["player", id, username, *rest]: players[id] = username case ["turn", turn]: turn = int(turn) case ["move", user, move, target]: last_move = (user, target) player, user = resolve_mon(user) _, target = resolve_mon(target) conn.execute( """ INSERT INTO moves(game, turn, player, name, user, target) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, move, user, target), ) case ["switch", name, specie, *rest]: player, name = resolve_mon(name) conn.execute( """ INSERT INTO switches(game, turn, player, name) VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, name), ) conn.execute( """ INSERT INTO nicknames(game, player, name, specie) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, player, name, specie.split(", ")[0]), ) case ["faint", mon]: player, mon = resolve_mon(mon) conn.execute( """ INSERT INTO knockouts(game, turn, player, name) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, mon), ) case ["-sidestart", side, env]: if not last_move: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0] case ["-status", mon, cond]: if not last_move or last_move[1] != mon: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") last_status_set[(mon, cond)] = last_move[0] case ["-damage", mon, *rest]: # rest is new_hp and sometimes a source (if not from a move) # in a knockout, new_hp is "0 fnt" if rest[0] == "0 fnt" and len(rest) > 1: LOG.debug(f"tracing source for {line}") source = rest[1].replace("[from] ", "") source_user = None if source == "Recoil" or source.startswith("item: "): source_user = source.replace("item: ", "") if source_user: LOG.debug(f"identified special source {source_user}") else: source_user = last_env_set.get((mon[0:1], source)) if source_user: LOG.debug(f"identified hazard source {source_user}") else: source_user = last_status_set.get((mon, source)) if source_user: LOG.debug(f"identified move source {source_user}") else: LOG.error(f"missing source for {line}") continue conn.execute( """ INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user) VALUES(?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, mon, source, source_user), ) case _: # LOG.debug(f"unhandled message {chunks[0]}") pass @dataclass(frozen=True) class Replay: id: str p1: str p2: str format: str log: str uploadtime: int views: int p1id: str p2id: str formatid: str rating: int private: int password: t.Optional[str] def fetch(replay: str, cache: bool = False) -> Replay: replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay_file = Path.cwd() / "cache" / f"{replay}.json" if cache and replay_file.exists(): with replay_file.open() as f: return Replay(**json.load(f)) data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") if data.status_code != 200: raise Exception(data.text) data = data.json() if cache: replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) with replay_file.open(mode="w") as f: json.dump(data, f) return Replay(**data) def main(): parser = argparse.ArgumentParser( prog=APP, description="extracts stats from a Showdown replay" ) parser.add_argument( "-v", "--verbose", action="store_true", help="add debugging info" ) parser.add_argument("-c", "--cache", action="store_true", help="cache replays") parser.add_argument("replay", nargs="+", help="replay ID or URL") args = parser.parse_args(sys.argv[1:]) LOG.setLevel(logging.DEBUG if args.verbose else logging.INFO) try: db = sqlite3.connect("data.db") _init_db(db) for r in args.replay: try: replay = fetch(r, cache=args.cache) except Exception as e: LOG.error(f"bad replay {r}") continue LOG.info(f"indexing game {replay.id}") db.execute( """ INSERT INTO games(id, p1, p2, format, uploadtime) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (replay.id, replay.p1, replay.p2, replay.format, replay.uploadtime), ) parse_log(replay.id, replay.log, into=db) db.commit() finally: db.close() if __name__ == "__main__": main()