#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from functools import partial, partialmethod from pathlib import Path import argparse import json import logging import requests import sqlite3 import sys import typing as t logging.TRACE = 5 logging.addLevelName(logging.TRACE, "TRACE") logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) logging.trace = partial(logging.log, logging.TRACE) class LogFormatter(logging.Formatter): format = "%(name)s [%(levelname)s] %(message)s" FORMATS = { logging.TRACE: f"\x1b[30;20m{format}\x1b[0m", logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m", logging.INFO: f"\x1b[34;20m{format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m", } def format(self, record): fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(fmt) return formatter.format(record) APP = "hhirlstats" LOG = logging.getLogger(APP) _ch = logging.StreamHandler() _ch.setFormatter(LogFormatter()) LOG.addHandler(_ch) TEAMS = {} _logged_teams = [] def team(player: str) -> str: """Maps a username to a defined team.""" if player in TEAMS: return TEAMS[player] else: if not player in _logged_teams: LOG.warning(f"missing team mapping for {player}") _logged_teams.append(player) return player class safelist(list): def get(self, index, default=None): try: return self.__getitem__(index) except IndexError: return default def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) conn.row_factory = namedtuple_factory conn.executescript( """ CREATE TABLE IF NOT EXISTS moves( game, turn, player, name, user, target, UNIQUE(game, turn, player, user) ); CREATE TABLE IF NOT EXISTS switches( game, turn, player, name, UNIQUE(game, turn, player, name) ); CREATE TABLE IF NOT EXISTS nicknames( game, player, name, specie, UNIQUE(game, player, specie) ); CREATE TABLE IF NOT EXISTS knockouts( game, turn, player, name, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS indirect_knockouts( game, turn, player, name, source, source_user, source_player, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS games( id, p1, p2, format, uploadtime, winner, UNIQUE(id) ) """ ) def parse_log(game: str, log: str, into: sqlite3.Connection): conn = into turn = 0 players = {} # ("p2a: Edward", "p1a: Meteo") # memorises the user of the move that causes environment setting or status, # and its target last_move: t.Optional[tuple[str, str]] # ("p1", "Spikes") => "p2a: Frosslas" last_env_set: dict[tuple[str, str], str] = {} # ("p1a: Meteo", "brn") => "p2a: Edward" last_status_set: dict[tuple[str, str], str] = {} def resolve_mon(user: str) -> tuple[str, str]: [player, name] = user.split(": ") return players[player.strip("ab")], name for line in log.split("\n"): chunks = line.split("|")[1:] if not chunks: continue LOG.trace(line) match chunks: case ["player", id, username, *rest]: players[id] = username case ["turn", turn]: turn = int(turn) case ["move", user, move, target]: last_move = (user, target) player, user = resolve_mon(user) _, target = resolve_mon(target) conn.execute( """ INSERT INTO moves(game, turn, player, name, user, target) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, team(player), move, user, target), ) case ["switch", name, specie, *rest]: player, name = resolve_mon(name) conn.execute( """ INSERT INTO switches(game, turn, player, name) VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, team(player), name), ) conn.execute( """ INSERT INTO nicknames(game, player, name, specie) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, team(player), name, specie.split(", ")[0]), ) case ["faint", mon]: player, mon = resolve_mon(mon) conn.execute( """ INSERT INTO knockouts(game, turn, player, name) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, team(player), mon), ) case ["win", player]: conn.execute( """ UPDATE games SET winner = ? WHERE id = ? """, (team(player), game), ) case ["-sidestart", side, env]: if not last_move: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0] case ["-status", mon, cond]: if not last_move or last_move[1] != mon: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") last_status_set[(mon, cond)] = last_move[0] case ["-damage", mon, *rest]: # rest is new_hp and sometimes a source (if not from a move) # in a knockout, new_hp is "0 fnt" if rest[0] == "0 fnt" and len(rest) > 1: LOG.debug(f"tracing source for {line}") source = rest[1].replace("[from] ", "") source_user = None test_hazard = last_env_set.get((mon[0:1], source)) if test_hazard: source_user = test_hazard LOG.debug(f"identified hazard source {source_user}") test_status = last_status_set.get((mon, source)) if test_status: source_user = test_status LOG.debug(f"identified move source {source_user}") if source == "Recoil" or source.startswith("item: "): LOG.debug(f"identified special source {source}") source = source.replace("item: ", "") source_user = "self" if not source_user: LOG.error(f"missing source for {line}") continue player, mon = resolve_mon(mon) if source_user.startswith("p1") or source_user.startswith("p2"): source_player, source_user = resolve_mon(source_user) else: source_player = None conn.execute( """ INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player) VALUES(?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( game, turn, team(player), mon, source, source_user, team(source_player), ), ) case _: # LOG.debug(f"unhandled message {chunks[0]}") pass @dataclass(frozen=True) class Replay: id: str p1: str p2: str format: str log: str uploadtime: int views: int p1id: str p2id: str formatid: str rating: int private: int password: t.Optional[str] def fetch(replay: str, cache: bool = True) -> Replay: replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay_file = Path.cwd() / "cache" / f"{replay}.json" if cache and replay_file.exists(): with replay_file.open() as f: return Replay(**json.load(f)) data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") if data.status_code != 200: raise Exception(data.text) data = data.json() if cache: replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) with replay_file.open(mode="w") as f: json.dump(data, f) return Replay(**data) def main(args): parser = argparse.ArgumentParser( prog=APP, description="extracts stats from a Showdown replay", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("-v", "--verbose", action="count", help="add debugging info") parser.add_argument( "-C", "--no-cache", action="store_true", help="fetch replays instead of using cache", ) parser.add_argument( "-t", "--teams", action="store", metavar="FILE", default="teams.json", help="JSON file defining players to teams", ) parser.add_argument( "-o", "--output", action="store", metavar="FILE", default="data.db", help="output data file", ) parser.add_argument("replay", nargs="+", help="replay ID or URL") args = parser.parse_args(args) if args.verbose and args.verbose > 1: LOG.setLevel(logging.TRACE) elif args.verbose: LOG.setLevel(logging.DEBUG) if args.teams: with open(args.teams) as f: global TEAMS TEAMS = json.load(f) try: db = sqlite3.connect(args.output) _init_db(db) for r in args.replay: try: replay = fetch(r, cache=not args.no_cache) except Exception as e: LOG.error(f"bad replay {r}") continue LOG.info(f"indexing game {replay.id}") db.execute( """ INSERT INTO games(id, p1, p2, format, uploadtime) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( replay.id, team(replay.p1), team(replay.p2), replay.format, replay.uploadtime, ), ) parse_log(replay.id, replay.log, into=db) db.commit() finally: db.close() if __name__ == "__main__": main(sys.argv[1:])