#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from functools import partial, partialmethod from pathlib import Path import argparse import json import logging import re import requests import sqlite3 import sys import typing as t logging.TRACE = 5 # type: ignore logging.addLevelName(logging.TRACE, "TRACE") # type: ignore logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore logging.trace = partial(logging.log, logging.TRACE) # type: ignore class LogFormatter(logging.Formatter): _format = "%(name)s [%(levelname)s] %(message)s" FORMATS = { logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m", logging.INFO: f"\x1b[34;20m{_format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m", } def format(self, record): fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(fmt) return formatter.format(record) APP = "hhirlstats" LOG = logging.getLogger(APP) _ch = logging.StreamHandler() _ch.setFormatter(LogFormatter()) LOG.addHandler(_ch) def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) conn.row_factory = namedtuple_factory conn.executescript( """ CREATE TABLE IF NOT EXISTS moves( game, turn, player, pokemon, move, target, UNIQUE(game, turn, player, pokemon) ); CREATE TABLE IF NOT EXISTS switches( game, turn, player, pokemon, UNIQUE(game, turn, player, pokemon) ); CREATE TABLE IF NOT EXISTS nicknames( game, player, pokemon, specie, UNIQUE(game, player, specie) ); CREATE TABLE IF NOT EXISTS knockouts( game, turn, player, pokemon, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS indirect_knockouts( game, turn, player, pokemon, reason, source, source_player, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS games( id, p1, p2, format, uploadtime, winner, UNIQUE(id) ); -- No good way to ensure idempotence for damage; just re-build it. DROP TABLE IF EXISTS damage; CREATE TABLE damage(game, player, pokemon, value); DROP TABLE IF EXISTS indirect_damage; CREATE TABLE indirect_damage(game, player, pokemon, value); """ ) # Either the value "p1" or "p2" PlayerTag = t.NewType("PlayerTag", str) # A player's name Player = t.NewType("Player", str) # A player prefixed with a PlayerTag TaggedPlayer = t.NewType("TaggedPlayer", str) # A Pokemon identified by its nickname, if any Pokemon = t.NewType("Pokemon", str) # A Pokemon specie PokemonSpecie = t.NewType("PokemonSpecie", str) # A Pokemon prefixed with a PlayerTag TaggedPokemon = t.NewType("TaggedPokemon", str) TEAMS: dict[Player, Player] = {} _logged_teams: list[Player] = [] def team(player: Player) -> Player: """Maps a username to a defined team.""" if player in TEAMS: return TEAMS[player] else: if not player in _logged_teams and player: LOG.warning(f"missing team mapping for {player}") _logged_teams.append(player) return player class LogParser: turn = 0 players: dict[PlayerTag, Player] = {} hp: dict[TaggedPokemon, int] = {} # ("p2a: Edward", "p1a: Meteo") # memorises the user of the move that causes environment setting or status, # and its target last_move: t.Optional[tuple[str, str]] # ("p1", "Spikes") => "p2a: Frosslas" last_env_set: dict[tuple[str, str], str] = {} # ("p1a: Meteo", "brn") => "p2a: Edward" last_status_set: dict[tuple[str, str], str] = {} def __init__(self, game: str, into: sqlite3.Connection): self.game = game self.conn: sqlite3.Connection = into def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]: """Splits a TaggedPokemon into the owning player and the Pokemon.""" [player, pokemon] = user.split(": ") return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon) @t.overload def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie: """Resolves the species of a nicknamed Pokemon.""" ... @t.overload def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie: """Resolves the species of a Pokemon given its Showdown identifier (used in split_pokemon).""" ... def specie( self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None ) -> PokemonSpecie: if not player: [player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon)) return ( self.conn.execute( """ SELECT specie FROM nicknames WHERE (game, player, pokemon) = (?, ?, ?) LIMIT 1 """, (self.game, team(player), pokemon), ) .fetchall()[0] .specie ) def _reset(self): self.turn = 0 self.players.clear() def _log_appearance(self, name: TaggedPokemon, specie: str): # Also includes gender and formes. trimmed_specie = PokemonSpecie(specie.split(", ")[0]) player, nickname = self.split_pokemon(name) self.conn.execute( """ INSERT INTO nicknames(game, player, pokemon, specie) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (self.game, team(player), nickname, trimmed_specie), ) def parse(self, log: str): self._reset() for line in log.split("\n"): chunks = line.split("|")[1:] if not chunks: continue LOG.trace(line) # type: ignore match chunks: # t.Literal, TaggedPokemon, str, str case ["drag", name_, specie, status, *rest]: name = TaggedPokemon(name_) self.hp[name] = int(status.split("/")[0]) self._log_appearance(name, specie) # t.Literal, TaggedPokemon case ["faint", pokemon_]: pokemon = TaggedPokemon(pokemon_) player, _ = self.split_pokemon(pokemon) self.conn.execute( """ INSERT INTO knockouts(game, turn, player, pokemon) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (self.game, self.turn, team(player), self.specie(pokemon)), ) # t.Literal, TaggedPokemon, str, TaggedPokemon case ["move", user_, move, target_]: user = TaggedPokemon(user_) target = TaggedPokemon(target_) last_move = (user, target) player, _ = self.split_pokemon(user) self.conn.execute( """ INSERT INTO moves(game, turn, player, pokemon, move, target) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( self.game, self.turn, team(player), self.specie(user), move, self.specie(target), ), ) # t.Literal, PlayerTag, Player case ["player", id, username, *rest]: self.players[PlayerTag(id)] = Player(username) # t.Literal, TaggedPokemon, str case ["replace", name, specie]: self._log_appearance(name, specie) # t.Literal, TaggedPokemon, str, str, t.Optional[str] case ["switch", name, specie, status, *rest]: self.hp[name] = int(status.split("/")[0]) # Also includes gender and formes. trimmed_specie = specie.split(", ")[0] player, nickname = self.split_pokemon(name) self._log_appearance(name, specie) self.conn.execute( """ INSERT INTO switches(game, turn, player, pokemon) VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING """, (self.game, self.turn, team(player), trimmed_specie), ) # t.Literal, str case ["turn", turn]: self.turn = int(turn) # t.Literal, Player case ["win", player]: self.conn.execute( """ UPDATE games SET winner = ? WHERE id = ? """, (team(player), self.game), ) case ["-heal", pokemon, status, *rest]: self.hp[pokemon] = int(status.split("/")[0]) # t.Literal, TaggedPokemon, str case ["-damage", pokemon, status]: # Pokemon takes direct (non-hazard/condition) damage; status # can be a percentage "70/100" with or without condition, or # "0 fnt" new_hp = int(re.split("[/ ]", status)[0]) LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}") LOG.debug(f"source: {last_move}") # resolve to damage source if last_move[1] != pokemon: LOG.warning( f"{pokemon} took direct damage but last move was not" " targeted at them" ) continue damage_source = last_move[0] source_player, source_nickname = self.split_pokemon(damage_source) self.conn.execute( """ INSERT INTO damage(game, player, pokemon, value) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( self.game, team(source_player), self.specie(damage_source), self.hp[pokemon] - new_hp, ), ) self.hp[pokemon] = new_hp # t.Literal, TaggedPokemon, str, str case ["-damage", pokemon_, status, from_]: pokemon = TaggedPokemon(pokemon_) # Pokemon takes indirect damage; status can be a percentage # "70/100" with or without condition, or "0 fnt" new_hp = int(re.split("[/ ]", status)[0]) LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}") LOG.debug(f"tracing reason for {line}") reason = from_.replace("[from] ", "") source: TaggedPokemon | str | None = None source_is_pokemon = True test_hazard = self.last_env_set.get((pokemon[0:1], reason)) if test_hazard: source = test_hazard LOG.debug(f"identified hazard source {source}") test_status = self.last_status_set.get((pokemon, reason)) if test_status: source = test_status LOG.debug(f"identified move source {source}") if reason == "Recoil" or reason.startswith("item: "): LOG.debug(f"identified special source {reason}") reason = reason.replace("item: ", "") source = "self" source_is_pokemon = False if not source: LOG.error(f"missing reason for {line}") continue player, nickname = self.split_pokemon(pokemon) if source.startswith("p1") or source.startswith("p2"): source_player, _ = self.split_pokemon(TaggedPokemon(source)) else: source_player = None # type: ignore source_is_pokemon = False if source_player: self.conn.execute( """ INSERT INTO indirect_damage(game, player, pokemon, value) VALUES(?, ?, ?, ?) """, ( self.game, team(source_player), self.specie(TaggedPokemon(source)), self.hp[pokemon] - new_hp, ), ) if status == "0 fnt": self.conn.execute( """ INSERT INTO indirect_knockouts( game, turn, player, pokemon, reason, source, source_player) VALUES(?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( self.game, self.turn, team(player), self.specie(pokemon), reason, self.specie(TaggedPokemon(source)) if source_is_pokemon else source, team(source_player), ), ) # t.Literal, TaggedPlayer, str case ["-sidestart", side, env]: if not last_move: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") self.last_env_set[ (side[0:1], env.replace("move: ", "")) ] = last_move[0] # t.Literal, TaggedPokemon, str case ["-status", mon, cond]: if not last_move or last_move[1] != mon: LOG.warning(f"missing previous move for {line}") continue LOG.debug(f"{line} <- {last_move}") self.last_status_set[(mon, cond)] = last_move[0] case _: # LOG.debug(f"unhandled message {chunks[0]}") pass @dataclass(frozen=True) class Replay: id: str p1: str p2: str format: str log: str uploadtime: int views: int p1id: str p2id: str formatid: str rating: int private: int password: t.Optional[str] def fetch(replay: str, cache: bool = True) -> Replay: replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay_file = Path.cwd() / "cache" / f"{replay}.json" if cache and replay_file.exists(): with replay_file.open() as f: return Replay(**json.load(f)) data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") if data.status_code != 200: raise Exception(data.text) data = data.json() if cache: replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) with replay_file.open(mode="w") as f: json.dump(data, f) return Replay(**data) # type: ignore def main(args): parser = argparse.ArgumentParser( prog=APP, description="extracts stats from a Showdown replay", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("-v", "--verbose", action="count", help="add debugging info") parser.add_argument( "-C", "--no-cache", action="store_true", help="fetch replays instead of using cache", ) parser.add_argument( "-t", "--teams", action="store", metavar="FILE", default="teams.json", help="JSON file defining players to teams", ) parser.add_argument( "-o", "--output", action="store", metavar="FILE", default="data.db", help="output data file", ) parser.add_argument("replay", nargs="+", help="replay ID or URL") args = parser.parse_args(args) if args.verbose and args.verbose > 1: LOG.setLevel(logging.TRACE) elif args.verbose: LOG.setLevel(logging.DEBUG) if args.teams: with open(args.teams) as f: global TEAMS TEAMS = json.load(f) try: db = sqlite3.connect(args.output) _init_db(db) for r in args.replay: try: replay = fetch(r, cache=not args.no_cache) except Exception as e: LOG.error(f"bad replay {r}") continue LOG.info(f"indexing game {replay.id}") db.execute( """ INSERT INTO games(id, p1, p2, format, uploadtime) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( replay.id, team(replay.p1), team(replay.p2), replay.format, replay.uploadtime, ), ) LogParser(replay.id, db).parse(replay.log) db.commit() finally: db.close() if __name__ == "__main__": main(sys.argv[1:])