From 37a2df3039de0c0ffa8ee47c9fed8132ca9e0baf Mon Sep 17 00:00:00 2001 From: xeals Date: Tue, 4 Apr 2023 22:21:33 +1000 Subject: [PATCH] refactor parser into class; add newtyping --- flake.nix | 9 +- main.py | 580 +++++++++++++++++++++++++++++------------------------- 2 files changed, 315 insertions(+), 274 deletions(-) diff --git a/flake.nix b/flake.nix index 5e245ee..50df030 100644 --- a/flake.nix +++ b/flake.nix @@ -13,10 +13,17 @@ buildInputs = let python = pkgs.python3.withPackages (ps: [ + ps.mypy ps.requests + ps.types-requests ]); in - [ python pkgs.sqlite ]; + [ + python + pkgs.sqlite + python.pkgs.python-lsp-server + python.pkgs.pylsp-mypy + ]; }; }); } diff --git a/main.py b/main.py index c96319e..437aea6 100755 --- a/main.py +++ b/main.py @@ -14,22 +14,22 @@ import sys import typing as t -logging.TRACE = 5 -logging.addLevelName(logging.TRACE, "TRACE") -logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) -logging.trace = partial(logging.log, logging.TRACE) +logging.TRACE = 5 # type: ignore +logging.addLevelName(logging.TRACE, "TRACE") # type: ignore +logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore +logging.trace = partial(logging.log, logging.TRACE) # type: ignore class LogFormatter(logging.Formatter): - format = "%(name)s [%(levelname)s] %(message)s" + _format = "%(name)s [%(levelname)s] %(message)s" FORMATS = { - logging.TRACE: f"\x1b[30;20m{format}\x1b[0m", - logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m", - logging.INFO: f"\x1b[34;20m{format}\x1b[0m", - logging.WARNING: f"\x1b[33;20m{format}\x1b[0m", - logging.ERROR: f"\x1b[31;20m{format}\x1b[0m", - logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m", + logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore + logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m", + logging.INFO: f"\x1b[34;20m{_format}\x1b[0m", + logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m", + logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m", + logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m", } def format(self, record): @@ -45,29 +45,6 @@ _ch.setFormatter(LogFormatter()) LOG.addHandler(_ch) -TEAMS = {} -_logged_teams = [] - - -def team(player: str) -> str: - """Maps a username to a defined team.""" - if player in TEAMS: - return TEAMS[player] - else: - if not player in _logged_teams and player: - LOG.warning(f"missing team mapping for {player}") - _logged_teams.append(player) - return player - - -class safelist(list): - def get(self, index, default=None): - try: - return self.__getitem__(index) - except IndexError: - return default - - def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] @@ -112,12 +89,45 @@ def _init_db(conn: sqlite3.Connection): ) -def parse_log(game: str, log: str, into: sqlite3.Connection): - conn = into +# Either the value "p1" or "p2" +PlayerTag = t.NewType("PlayerTag", str) + +# A player's name +Player = t.NewType("Player", str) + +# A player prefixed with a PlayerTag +TaggedPlayer = t.NewType("TaggedPlayer", str) + +# A Pokemon identified by its nickname, if any +Pokemon = t.NewType("Pokemon", str) + +# A Pokemon specie +PokemonSpecie = t.NewType("PokemonSpecie", str) + +# A Pokemon prefixed with a PlayerTag +TaggedPokemon = t.NewType("TaggedPokemon", str) + + +TEAMS: dict[Player, Player] = {} +_logged_teams: list[Player] = [] + + +def team(player: Player) -> Player: + """Maps a username to a defined team.""" + if player in TEAMS: + return TEAMS[player] + else: + if not player in _logged_teams and player: + LOG.warning(f"missing team mapping for {player}") + _logged_teams.append(player) + return player + + +class LogParser: turn = 0 - players = {} - hp = {} + players: dict[PlayerTag, Player] = {} + hp: dict[TaggedPokemon, int] = {} # ("p2a: Edward", "p1a: Meteo") # memorises the user of the move that causes environment setting or status, @@ -130,273 +140,297 @@ def parse_log(game: str, log: str, into: sqlite3.Connection): # ("p1a: Meteo", "brn") => "p2a: Edward" last_status_set: dict[tuple[str, str], str] = {} - def split_pokemon(user: str) -> tuple[str, str]: - """Splits a Pokemon identifier of the form `pXa: Pokemon` into the - player's name (as marked by the player log) and "Pokemon". + def __init__(self, game: str, into: sqlite3.Connection): + self.game = game + self.conn: sqlite3.Connection = into - Note that all Pokemon are referred to by their nicknames, and will - require resolving to obtain the Pokemon specie.""" - [player, name] = user.split(": ") - return players[player.strip("ab")], name + def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]: + """Splits a TaggedPokemon into the owning player and the Pokemon.""" + [player, pokemon] = user.split(": ") + return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon) - def specie_from_parts(player: str, nickname: str) -> str: + @t.overload + def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie: """Resolves the species of a nicknamed Pokemon.""" + ... + + @t.overload + def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie: + """Resolves the species of a Pokemon given its Showdown identifier (used + in split_pokemon).""" + ... + + def specie( + self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None + ) -> PokemonSpecie: + if not player: + [player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon)) return ( - conn.execute( + self.conn.execute( """ SELECT specie FROM nicknames WHERE (game, player, pokemon) = (?, ?, ?) LIMIT 1 """, - (game, team(player), nickname), + (self.game, team(player), pokemon), ) .fetchall()[0] .specie ) - def specie(pokemon: str) -> str: - """Resolves the species of a Pokemon given its Showdown identifier (used - in split_pokemon).""" - return specie_from_parts(*split_pokemon(pokemon)) + def _reset(self): + self.turn = 0 + self.players.clear() - for line in log.split("\n"): - chunks = line.split("|")[1:] - if not chunks: - continue + def _log_appearance(self, name: TaggedPokemon, specie: str): - LOG.trace(line) + # Also includes gender and formes. + trimmed_specie = PokemonSpecie(specie.split(", ")[0]) + player, nickname = self.split_pokemon(name) - match chunks: - case ["player", id, username, *rest]: - players[id] = username + self.conn.execute( + """ + INSERT INTO nicknames(game, player, pokemon, specie) + VALUES(?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + (self.game, team(player), nickname, trimmed_specie), + ) - case ["turn", turn]: - turn = int(turn) + def parse(self, log: str): + self._reset() - case ["move", user, move, target]: - last_move = (user, target) - player, _ = split_pokemon(user) - conn.execute( - """ - INSERT INTO moves(game, turn, player, pokemon, move, target) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - ( - game, - turn, - team(player), - specie(user), - move, - specie(target), - ), - ) + for line in log.split("\n"): + chunks = line.split("|")[1:] + if not chunks: + continue - case ["drag", name, specie_, status, *rest]: - hp[name] = int(status.split("/")[0]) + LOG.trace(line) # type: ignore - # Also includes gender and formes. - trimmed_specie = specie_.split(", ")[0] + match chunks: + # t.Literal, PlayerTag, Player + case ["player", id, username, *rest]: + self.players[PlayerTag(id)] = Player(username) - player, nickname = split_pokemon(name) - conn.execute( - """ - INSERT INTO nicknames(game, player, pokemon, specie) - VALUES(?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - (game, team(player), nickname, trimmed_specie), - ) + # t.Literal, str + case ["turn", turn]: + self.turn = int(turn) - case ["replace", name, specie_]: - # Also includes gender and formes. - trimmed_specie = specie_.split(", ")[0] + # t.Literal, TaggedPokemon, str, TaggedPokemon + case ["move", user_, move, target_]: + user = TaggedPokemon(user_) + target = TaggedPokemon(target_) - player, nickname = split_pokemon(name) - conn.execute( - """ - INSERT INTO nicknames(game, player, pokemon, specie) - VALUES(?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - (game, team(player), nickname, trimmed_specie), - ) - - case ["switch", name, specie_, status, *rest]: - hp[name] = int(status.split("/")[0]) - - # Also includes gender and formes. - trimmed_specie = specie_.split(", ")[0] - - player, nickname = split_pokemon(name) - conn.execute( - """ - INSERT INTO switches(game, turn, player, pokemon) - VALUES (?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - (game, turn, team(player), trimmed_specie), - ) - conn.execute( - """ - INSERT INTO nicknames(game, player, pokemon, specie) - VALUES(?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - (game, team(player), nickname, trimmed_specie), - ) - - case ["faint", pokemon]: - conn.execute( - """ - INSERT INTO knockouts(game, turn, player, pokemon) - VALUES(?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - (game, turn, team(player), specie(pokemon)), - ) - - case ["win", player]: - conn.execute( - """ - UPDATE games - SET winner = ? - WHERE id = ? - """, - (team(player), game), - ) - - case ["-sidestart", side, env]: - if not last_move: - LOG.warning(f"missing previous move for {line}") - continue - - LOG.debug(f"{line} <- {last_move}") - last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0] - - case ["-status", mon, cond]: - if not last_move or last_move[1] != mon: - LOG.warning(f"missing previous move for {line}") - continue - - LOG.debug(f"{line} <- {last_move}") - last_status_set[(mon, cond)] = last_move[0] - - case ["-damage", pokemon, status]: - # mon takes direct (non-hazard/condition) damage - # status can be a percentage 70/100 with or without condition, - # or "0 fnt" - new_hp = int(re.split("[/ ]", status)[0]) - LOG.debug(f"{pokemon} dropped to {new_hp} from {hp[pokemon]}") - LOG.debug(f"source: {last_move}") - - # resolve to damage source - if last_move[1] != pokemon: - LOG.warning( - f"{pokemon} took direct damage but last move was not" - " targeted at them" - ) - continue - damage_source = last_move[0] - source_player, source_nickname = split_pokemon(damage_source) - - conn.execute( - """ - INSERT INTO damage(game, player, pokemon, value) - VALUES(?, ?, ?, ?) - ON CONFLICT DO NOTHING - """, - ( - game, - team(source_player), - specie(damage_source), - hp[pokemon] - new_hp, - ), - ) - - hp[pokemon] = new_hp - - case ["-damage", pokemon, status, from_]: - # mon takes indirect damage - # status can be a percentage 70/100 with or without condition, - # or "0 fnt" - new_hp = int(re.split("[/ ]", status)[0]) - LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}") - - LOG.debug(f"tracing reason for {line}") - reason = from_.replace("[from] ", "") - - source = None - source_is_pokemon = True - - test_hazard = last_env_set.get((pokemon[0:1], reason)) - if test_hazard: - source = test_hazard - LOG.debug(f"identified hazard source {source}") - - test_status = last_status_set.get((pokemon, reason)) - if test_status: - source = test_status - LOG.debug(f"identified move source {source}") - - if reason == "Recoil" or reason.startswith("item: "): - LOG.debug(f"identified special source {reason}") - reason = reason.replace("item: ", "") - source = "self" - source_is_pokemon = False - - if not source: - LOG.error(f"missing reason for {line}") - continue - - player, nickname = split_pokemon(pokemon) - if source.startswith("p1") or source.startswith("p2"): - source_player, _ = split_pokemon(source) - else: - source_player = None - source_is_pokemon = False - - if source_player: - conn.execute( + last_move = (user, target) + player, _ = self.split_pokemon(user) + self.conn.execute( """ - INSERT INTO indirect_damage(game, player, pokemon, value) - VALUES(?, ?, ?, ?) - """, - ( - game, - team(source_player), - specie(source), - hp[pokemon] - new_hp, - ), - ) - - if status == "0 fnt": - conn.execute( - """ - INSERT INTO indirect_knockouts( - game, turn, player, pokemon, - reason, source, source_player) - VALUES(?, ?, ?, ?, ?, ?, ?) + INSERT INTO moves(game, turn, player, pokemon, move, target) + VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, ( - game, - turn, + self.game, + self.turn, team(player), - specie(pokemon), - reason, - specie(source) if source_is_pokemon else source, - team(source_player), + self.specie(user), + move, + self.specie(target), ), ) - case ["-heal", pokemon, status, *rest]: - hp[pokemon] = int(status.split("/")[0]) + # t.Literal, TaggedPokemon, str, str + case ["drag", name_, specie, status, *rest]: + name = TaggedPokemon(name_) - case _: - # LOG.debug(f"unhandled message {chunks[0]}") - pass + self.hp[name] = int(status.split("/")[0]) + self._log_appearance(name, specie) + + # t.Literal, TaggedPokemon, str + case ["replace", name, specie]: + self._log_appearance(name, specie) + + # t.Literal, TaggedPokemon, str, str, t.Optional[str] + case ["switch", name, specie, status, *rest]: + self.hp[name] = int(status.split("/")[0]) + + # Also includes gender and formes. + trimmed_specie = specie.split(", ")[0] + player, nickname = self.split_pokemon(name) + + self._log_appearance(name, specie) + self.conn.execute( + """ + INSERT INTO switches(game, turn, player, pokemon) + VALUES (?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + (self.game, self.turn, team(player), trimmed_specie), + ) + + # t.Literal, TaggedPokemon + case ["faint", pokemon_]: + pokemon = TaggedPokemon(pokemon_) + self.conn.execute( + """ + INSERT INTO knockouts(game, turn, player, pokemon) + VALUES(?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + (self.game, self.turn, team(player), self.specie(pokemon)), + ) + + # t.Literal, Player + case ["win", player]: + self.conn.execute( + """ + UPDATE games + SET winner = ? + WHERE id = ? + """, + (team(player), self.game), + ) + + # t.Literal, TaggedPlayer, str + case ["-sidestart", side, env]: + if not last_move: + LOG.warning(f"missing previous move for {line}") + continue + + LOG.debug(f"{line} <- {last_move}") + self.last_env_set[ + (side[0:1], env.replace("move: ", "")) + ] = last_move[0] + + # t.Literal, TaggedPokemon, str + case ["-status", mon, cond]: + if not last_move or last_move[1] != mon: + LOG.warning(f"missing previous move for {line}") + continue + + LOG.debug(f"{line} <- {last_move}") + self.last_status_set[(mon, cond)] = last_move[0] + + # t.Literal, TaggedPokemon, str + case ["-damage", pokemon, status]: + # Pokemon takes direct (non-hazard/condition) damage; status + # can be a percentage "70/100" with or without condition, or + # "0 fnt" + new_hp = int(re.split("[/ ]", status)[0]) + LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}") + LOG.debug(f"source: {last_move}") + + # resolve to damage source + if last_move[1] != pokemon: + LOG.warning( + f"{pokemon} took direct damage but last move was not" + " targeted at them" + ) + continue + damage_source = last_move[0] + source_player, source_nickname = self.split_pokemon(damage_source) + + self.conn.execute( + """ + INSERT INTO damage(game, player, pokemon, value) + VALUES(?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + ( + self.game, + team(source_player), + self.specie(damage_source), + self.hp[pokemon] - new_hp, + ), + ) + + self.hp[pokemon] = new_hp + + # t.Literal, TaggedPokemon, str, str + case ["-damage", pokemon_, status, from_]: + pokemon = TaggedPokemon(pokemon_) + + # Pokemon takes indirect damage; status can be a percentage + # "70/100" with or without condition, or "0 fnt" + new_hp = int(re.split("[/ ]", status)[0]) + LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}") + + LOG.debug(f"tracing reason for {line}") + reason = from_.replace("[from] ", "") + + source: TaggedPokemon | str | None = None + source_is_pokemon = True + + test_hazard = self.last_env_set.get((pokemon[0:1], reason)) + if test_hazard: + source = test_hazard + LOG.debug(f"identified hazard source {source}") + + test_status = self.last_status_set.get((pokemon, reason)) + if test_status: + source = test_status + LOG.debug(f"identified move source {source}") + + if reason == "Recoil" or reason.startswith("item: "): + LOG.debug(f"identified special source {reason}") + reason = reason.replace("item: ", "") + source = "self" + source_is_pokemon = False + + if not source: + LOG.error(f"missing reason for {line}") + continue + + player, nickname = self.split_pokemon(pokemon) + if source.startswith("p1") or source.startswith("p2"): + source_player, _ = self.split_pokemon(TaggedPokemon(source)) + else: + source_player = None # type: ignore + source_is_pokemon = False + + if source_player: + self.conn.execute( + """ + INSERT INTO indirect_damage(game, player, pokemon, value) + VALUES(?, ?, ?, ?) + """, + ( + self.game, + team(source_player), + self.specie(TaggedPokemon(source)), + self.hp[pokemon] - new_hp, + ), + ) + + if status == "0 fnt": + self.conn.execute( + """ + INSERT INTO indirect_knockouts( + game, turn, player, pokemon, + reason, source, source_player) + VALUES(?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + ( + self.game, + self.turn, + team(player), + self.specie(pokemon), + reason, + self.specie(TaggedPokemon(source)) + if source_is_pokemon + else source, + team(source_player), + ), + ) + + case ["-heal", pokemon, status, *rest]: + self.hp[pokemon] = int(status.split("/")[0]) + + case _: + # LOG.debug(f"unhandled message {chunks[0]}") + pass @dataclass(frozen=True) @@ -434,7 +468,7 @@ def fetch(replay: str, cache: bool = True) -> Replay: with replay_file.open(mode="w") as f: json.dump(data, f) - return Replay(**data) + return Replay(**data) # type: ignore def main(args): @@ -506,7 +540,7 @@ def main(args): ), ) - parse_log(replay.id, replay.log, into=db) + LogParser(replay.id, db).parse(replay.log) db.commit() finally: