refactor parser into class; add newtyping

This commit is contained in:
xeals 2023-04-04 22:21:33 +10:00
parent 8a348eca84
commit 37a2df3039
Signed by: xeals
GPG Key ID: A498C7AF27EC6B5C
2 changed files with 315 additions and 274 deletions

View File

@ -13,10 +13,17 @@
buildInputs = buildInputs =
let let
python = pkgs.python3.withPackages (ps: [ python = pkgs.python3.withPackages (ps: [
ps.mypy
ps.requests ps.requests
ps.types-requests
]); ]);
in in
[ python pkgs.sqlite ]; [
python
pkgs.sqlite
python.pkgs.python-lsp-server
python.pkgs.pylsp-mypy
];
}; };
}); });
} }

580
main.py
View File

@ -14,22 +14,22 @@ import sys
import typing as t import typing as t
logging.TRACE = 5 logging.TRACE = 5 # type: ignore
logging.addLevelName(logging.TRACE, "TRACE") logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
logging.trace = partial(logging.log, logging.TRACE) logging.trace = partial(logging.log, logging.TRACE) # type: ignore
class LogFormatter(logging.Formatter): class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s" _format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = { FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m", logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m", logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m", logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
} }
def format(self, record): def format(self, record):
@ -45,29 +45,6 @@ _ch.setFormatter(LogFormatter())
LOG.addHandler(_ch) LOG.addHandler(_ch)
TEAMS = {}
_logged_teams = []
def team(player: str) -> str:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection): def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row): def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description] fields = [column[0] for column in cursor.description]
@ -112,12 +89,45 @@ def _init_db(conn: sqlite3.Connection):
) )
def parse_log(game: str, log: str, into: sqlite3.Connection): # Either the value "p1" or "p2"
conn = into PlayerTag = t.NewType("PlayerTag", str)
# A player's name
Player = t.NewType("Player", str)
# A player prefixed with a PlayerTag
TaggedPlayer = t.NewType("TaggedPlayer", str)
# A Pokemon identified by its nickname, if any
Pokemon = t.NewType("Pokemon", str)
# A Pokemon specie
PokemonSpecie = t.NewType("PokemonSpecie", str)
# A Pokemon prefixed with a PlayerTag
TaggedPokemon = t.NewType("TaggedPokemon", str)
TEAMS: dict[Player, Player] = {}
_logged_teams: list[Player] = []
def team(player: Player) -> Player:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class LogParser:
turn = 0 turn = 0
players = {} players: dict[PlayerTag, Player] = {}
hp = {} hp: dict[TaggedPokemon, int] = {}
# ("p2a: Edward", "p1a: Meteo") # ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status, # memorises the user of the move that causes environment setting or status,
@ -130,273 +140,297 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
# ("p1a: Meteo", "brn") => "p2a: Edward" # ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {} last_status_set: dict[tuple[str, str], str] = {}
def split_pokemon(user: str) -> tuple[str, str]: def __init__(self, game: str, into: sqlite3.Connection):
"""Splits a Pokemon identifier of the form `pXa: Pokemon` into the self.game = game
player's name (as marked by the player log) and "Pokemon". self.conn: sqlite3.Connection = into
Note that all Pokemon are referred to by their nicknames, and will def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
require resolving to obtain the Pokemon specie.""" """Splits a TaggedPokemon into the owning player and the Pokemon."""
[player, name] = user.split(": ") [player, pokemon] = user.split(": ")
return players[player.strip("ab")], name return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
def specie_from_parts(player: str, nickname: str) -> str: @t.overload
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
"""Resolves the species of a nicknamed Pokemon.""" """Resolves the species of a nicknamed Pokemon."""
...
@t.overload
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
"""Resolves the species of a Pokemon given its Showdown identifier (used
in split_pokemon)."""
...
def specie(
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
) -> PokemonSpecie:
if not player:
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
return ( return (
conn.execute( self.conn.execute(
""" """
SELECT specie SELECT specie
FROM nicknames FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?) WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1 LIMIT 1
""", """,
(game, team(player), nickname), (self.game, team(player), pokemon),
) )
.fetchall()[0] .fetchall()[0]
.specie .specie
) )
def specie(pokemon: str) -> str: def _reset(self):
"""Resolves the species of a Pokemon given its Showdown identifier (used self.turn = 0
in split_pokemon).""" self.players.clear()
return specie_from_parts(*split_pokemon(pokemon))
for line in log.split("\n"): def _log_appearance(self, name: TaggedPokemon, specie: str):
chunks = line.split("|")[1:]
if not chunks:
continue
LOG.trace(line) # Also includes gender and formes.
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
player, nickname = self.split_pokemon(name)
match chunks: self.conn.execute(
case ["player", id, username, *rest]: """
players[id] = username INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, team(player), nickname, trimmed_specie),
)
case ["turn", turn]: def parse(self, log: str):
turn = int(turn) self._reset()
case ["move", user, move, target]: for line in log.split("\n"):
last_move = (user, target) chunks = line.split("|")[1:]
player, _ = split_pokemon(user) if not chunks:
conn.execute( continue
"""
INSERT INTO moves(game, turn, player, pokemon, move, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
turn,
team(player),
specie(user),
move,
specie(target),
),
)
case ["drag", name, specie_, status, *rest]: LOG.trace(line) # type: ignore
hp[name] = int(status.split("/")[0])
# Also includes gender and formes. match chunks:
trimmed_specie = specie_.split(", ")[0] # t.Literal, PlayerTag, Player
case ["player", id, username, *rest]:
self.players[PlayerTag(id)] = Player(username)
player, nickname = split_pokemon(name) # t.Literal, str
conn.execute( case ["turn", turn]:
""" self.turn = int(turn)
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
)
case ["replace", name, specie_]: # t.Literal, TaggedPokemon, str, TaggedPokemon
# Also includes gender and formes. case ["move", user_, move, target_]:
trimmed_specie = specie_.split(", ")[0] user = TaggedPokemon(user_)
target = TaggedPokemon(target_)
player, nickname = split_pokemon(name) last_move = (user, target)
conn.execute( player, _ = self.split_pokemon(user)
""" self.conn.execute(
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
)
case ["switch", name, specie_, status, *rest]:
hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie_.split(", ")[0]
player, nickname = split_pokemon(name)
conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), trimmed_specie),
)
conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), nickname, trimmed_specie),
)
case ["faint", pokemon]:
conn.execute(
"""
INSERT INTO knockouts(game, turn, player, pokemon)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), specie(pokemon)),
)
case ["win", player]:
conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), game),
)
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", pokemon, status]:
# mon takes direct (non-hazard/condition) damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {hp[pokemon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
f"{pokemon} took direct damage but last move was not"
" targeted at them"
)
continue
damage_source = last_move[0]
source_player, source_nickname = split_pokemon(damage_source)
conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
team(source_player),
specie(damage_source),
hp[pokemon] - new_hp,
),
)
hp[pokemon] = new_hp
case ["-damage", pokemon, status, from_]:
# mon takes indirect damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
source = None
source_is_pokemon = True
test_hazard = last_env_set.get((pokemon[0:1], reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = split_pokemon(source)
else:
source_player = None
source_is_pokemon = False
if source_player:
conn.execute(
""" """
INSERT INTO indirect_damage(game, player, pokemon, value) INSERT INTO moves(game, turn, player, pokemon, move, target)
VALUES(?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
""",
(
game,
team(source_player),
specie(source),
hp[pokemon] - new_hp,
),
)
if status == "0 fnt":
conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
( (
game, self.game,
turn, self.turn,
team(player), team(player),
specie(pokemon), self.specie(user),
reason, move,
specie(source) if source_is_pokemon else source, self.specie(target),
team(source_player),
), ),
) )
case ["-heal", pokemon, status, *rest]: # t.Literal, TaggedPokemon, str, str
hp[pokemon] = int(status.split("/")[0]) case ["drag", name_, specie, status, *rest]:
name = TaggedPokemon(name_)
case _: self.hp[name] = int(status.split("/")[0])
# LOG.debug(f"unhandled message {chunks[0]}") self._log_appearance(name, specie)
pass
# t.Literal, TaggedPokemon, str
case ["replace", name, specie]:
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
case ["switch", name, specie, status, *rest]:
self.hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie.split(", ")[0]
player, nickname = self.split_pokemon(name)
self._log_appearance(name, specie)
self.conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), trimmed_specie),
)
# t.Literal, TaggedPokemon
case ["faint", pokemon_]:
pokemon = TaggedPokemon(pokemon_)
self.conn.execute(
"""
INSERT INTO knockouts(game, turn, player, pokemon)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), self.specie(pokemon)),
)
# t.Literal, Player
case ["win", player]:
self.conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), self.game),
)
# t.Literal, TaggedPlayer, str
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_env_set[
(side[0:1], env.replace("move: ", ""))
] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_status_set[(mon, cond)] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-damage", pokemon, status]:
# Pokemon takes direct (non-hazard/condition) damage; status
# can be a percentage "70/100" with or without condition, or
# "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
f"{pokemon} took direct damage but last move was not"
" targeted at them"
)
continue
damage_source = last_move[0]
source_player, source_nickname = self.split_pokemon(damage_source)
self.conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
team(source_player),
self.specie(damage_source),
self.hp[pokemon] - new_hp,
),
)
self.hp[pokemon] = new_hp
# t.Literal, TaggedPokemon, str, str
case ["-damage", pokemon_, status, from_]:
pokemon = TaggedPokemon(pokemon_)
# Pokemon takes indirect damage; status can be a percentage
# "70/100" with or without condition, or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
source: TaggedPokemon | str | None = None
source_is_pokemon = True
test_hazard = self.last_env_set.get((pokemon[0:1], reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = self.last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = self.split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = self.split_pokemon(TaggedPokemon(source))
else:
source_player = None # type: ignore
source_is_pokemon = False
if source_player:
self.conn.execute(
"""
INSERT INTO indirect_damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
""",
(
self.game,
team(source_player),
self.specie(TaggedPokemon(source)),
self.hp[pokemon] - new_hp,
),
)
if status == "0 fnt":
self.conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
self.turn,
team(player),
self.specie(pokemon),
reason,
self.specie(TaggedPokemon(source))
if source_is_pokemon
else source,
team(source_player),
),
)
case ["-heal", pokemon, status, *rest]:
self.hp[pokemon] = int(status.split("/")[0])
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True) @dataclass(frozen=True)
@ -434,7 +468,7 @@ def fetch(replay: str, cache: bool = True) -> Replay:
with replay_file.open(mode="w") as f: with replay_file.open(mode="w") as f:
json.dump(data, f) json.dump(data, f)
return Replay(**data) return Replay(**data) # type: ignore
def main(args): def main(args):
@ -506,7 +540,7 @@ def main(args):
), ),
) )
parse_log(replay.id, replay.log, into=db) LogParser(replay.id, db).parse(replay.log)
db.commit() db.commit()
finally: finally: