Compare commits

..

No commits in common. "145f220a071266b8cd56e76e2b4c94c18e9524ad" and "262a0513569e251937751e962375460a9dd68136" have entirely different histories.

2 changed files with 216 additions and 358 deletions

View File

@ -13,17 +13,10 @@
buildInputs = buildInputs =
let let
python = pkgs.python3.withPackages (ps: [ python = pkgs.python3.withPackages (ps: [
ps.mypy
ps.requests ps.requests
ps.types-requests
]); ]);
in in
[ [ python pkgs.sqlite ];
python
pkgs.sqlite
python.pkgs.python-lsp-server
python.pkgs.pylsp-mypy
];
}; };
}); });
} }

565
main.py
View File

@ -14,22 +14,22 @@ import sys
import typing as t import typing as t
logging.TRACE = 5 # type: ignore logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE") # type: ignore logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE) # type: ignore logging.trace = partial(logging.log, logging.TRACE)
class LogFormatter(logging.Formatter): class LogFormatter(logging.Formatter):
_format = "%(name)s [%(levelname)s] %(message)s" format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = { FORMATS = {
logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m", logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{_format}\x1b[0m", logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
} }
def format(self, record): def format(self, record):
@ -45,6 +45,29 @@ _ch.setFormatter(LogFormatter())
LOG.addHandler(_ch) LOG.addHandler(_ch)
TEAMS = {}
_logged_teams = []
def team(player: str) -> str:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection): def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row): def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description] fields = [column[0] for column in cursor.description]
@ -56,24 +79,23 @@ def _init_db(conn: sqlite3.Connection):
conn.executescript( conn.executescript(
""" """
CREATE TABLE IF NOT EXISTS moves( CREATE TABLE IF NOT EXISTS moves(
game, turn, player, pokemon, move, target, game, turn, player, name, user, target,
UNIQUE(game, turn, player, pokemon) UNIQUE(game, turn, player, user)
); );
CREATE TABLE IF NOT EXISTS switches( CREATE TABLE IF NOT EXISTS switches(
game, turn, player, pokemon, game, turn, player, name,
UNIQUE(game, turn, player, pokemon) UNIQUE(game, turn, player, name)
); );
CREATE TABLE IF NOT EXISTS nicknames( CREATE TABLE IF NOT EXISTS nicknames(
game, player, pokemon, specie, game, player, name, specie,
UNIQUE(game, player, specie) UNIQUE(game, player, specie)
); );
CREATE TABLE IF NOT EXISTS knockouts( CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, pokemon, game, turn, player, name,
UNIQUE(game, turn, player) UNIQUE(game, turn, player)
); );
CREATE TABLE IF NOT EXISTS indirect_knockouts( CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, pokemon, game, turn, player, name, source, source_user, source_player,
reason, source, source_player,
UNIQUE(game, turn, player) UNIQUE(game, turn, player)
); );
CREATE TABLE IF NOT EXISTS games( CREATE TABLE IF NOT EXISTS games(
@ -82,377 +104,220 @@ def _init_db(conn: sqlite3.Connection):
); );
-- No good way to ensure idempotence for damage; just re-build it. -- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage; DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, pokemon, value); CREATE TABLE damage(game, player, name, value);
DROP TABLE IF EXISTS indirect_damage; DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, pokemon, value); CREATE TABLE indirect_damage(game, player, name, value);
""" """
) )
# Either the value "p1" or "p2" def parse_log(game: str, log: str, into: sqlite3.Connection):
PlayerTag = t.NewType("PlayerTag", str) conn = into
# A player's name
Player = t.NewType("Player", str)
# A player prefixed with a PlayerTag
TaggedPlayer = t.NewType("TaggedPlayer", str)
# A Pokemon identified by its nickname, if any
Pokemon = t.NewType("Pokemon", str)
# A Pokemon specie
PokemonSpecie = t.NewType("PokemonSpecie", str)
# A Pokemon prefixed with a PlayerTag
TaggedPokemon = t.NewType("TaggedPokemon", str)
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
return PlayerTag(tagged[0:1])
TEAMS: dict[Player, Player] = {}
_logged_teams: set[Player] = set()
def team(player: Player) -> Player:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.add(player)
return player
class LogParser:
turn = 0 turn = 0
players: dict[PlayerTag, Player] = {} players = {}
hp: dict[TaggedPokemon, int] = {} hp = {}
# Memorises the user of the move that causes environment setting or status, # ("p2a: Edward", "p1a: Meteo")
# its target, and the move name (for debugging). # memorises the user of the move that causes environment setting or status,
last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None # and its target
last_move: t.Optional[tuple[str, str]]
# Memorises the last hazard set against a player and the causing user. # ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {} last_env_set: dict[tuple[str, str], str] = {}
# Memorises statuses set on a pokemon and the causing user. # ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {} last_status_set: dict[tuple[str, str], str] = {}
def __init__(self, game: str, into: sqlite3.Connection): def resolve_mon(user: str) -> tuple[str, str]:
self.game = game [player, name] = user.split(": ")
self.conn: sqlite3.Connection = into return players[player.strip("ab")], name
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]: for line in log.split("\n"):
"""Splits a TaggedPokemon into the owning player and the Pokemon.""" chunks = line.split("|")[1:]
[player, pokemon] = user.split(": ") if not chunks:
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon) continue
@t.overload LOG.trace(line)
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
"""Resolves the species of a nicknamed Pokemon."""
...
@t.overload match chunks:
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie: case ["player", id, username, *rest]:
"""Resolves the species of a Pokemon given its Showdown identifier (used players[id] = username
in split_pokemon)."""
...
def specie( case ["turn", turn]:
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None turn = int(turn)
) -> PokemonSpecie:
if not player:
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
return (
self.conn.execute(
"""
SELECT specie
FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1
""",
(self.game, team(player), pokemon),
)
.fetchall()[0]
.specie
)
def _reset(self): case ["move", user, move, target]:
self.turn = 0 last_move = (user, target)
self.players.clear() player, user = resolve_mon(user)
self.hp.clear() _, target = resolve_mon(target)
self.last_move = None conn.execute(
self.last_env_set.clear() """
self.last_status_set.clear() INSERT INTO moves(game, turn, player, name, user, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), move, user, target),
)
def _log_appearance(self, name: TaggedPokemon, specie: str): case ["drag", name, specie, status, *rest]:
hp[name] = int(status.split("/")[0])
# Also includes gender and formes. case ["switch", name, specie, status, *rest]:
trimmed_specie = PokemonSpecie(specie.split(", ")[0]) hp[name] = int(status.split("/")[0])
player, nickname = self.split_pokemon(name)
self.conn.execute( player, name = resolve_mon(name)
""" conn.execute(
INSERT INTO nicknames(game, player, pokemon, specie) """
VALUES(?, ?, ?, ?) INSERT INTO switches(game, turn, player, name)
ON CONFLICT DO NOTHING VALUES (?, ?, ?, ?)
""", ON CONFLICT DO NOTHING
(self.game, team(player), nickname, trimmed_specie), """,
) (game, turn, team(player), name),
)
conn.execute(
"""
INSERT INTO nicknames(game, player, name, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), name, specie.split(", ")[0]),
)
def parse(self, log: str): case ["faint", mon]:
self._reset() player, mon = resolve_mon(mon)
conn.execute(
"""
INSERT INTO knockouts(game, turn, player, name)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), mon),
)
for line in log.split("\n"): case ["win", player]:
chunks = line.split("|")[1:] conn.execute(
if not chunks: """
continue UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), game),
)
LOG.trace(line) # type: ignore case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
match chunks: case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
# t.Literal, TaggedPokemon, str, str case ["-damage", mon, status]:
case ["drag", name_, specie, status, *rest]: # mon takes direct (non-hazard/condition) damage
name = TaggedPokemon(name_) # status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {hp[mon]}")
LOG.debug(f"source: {last_move}")
self.hp[name] = int(status.split("/")[0]) # resolve to damage source
self._log_appearance(name, specie) if last_move[1] != mon:
LOG.warn(
f"{mon} took direct damage but last move was not targeted at them"
)
continue
user = last_move[0]
source_player, source_mon = resolve_mon(user)
# t.Literal, TaggedPokemon conn.execute(
case ["faint", pokemon_]: """
pokemon = TaggedPokemon(pokemon_) INSERT INTO damage(game, player, name, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(source_player), source_mon, hp[mon] - new_hp),
)
player, _ = self.split_pokemon(pokemon) hp[mon] = new_hp
self.conn.execute(
case ["-damage", mon, status, from_]:
# mon takes indirect damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
# mon has fainted from an indirect damage source
#
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing source for {line}")
source = from_.replace("[from] ", "")
source_user = None
test_hazard = last_env_set.get((mon[0:1], source))
if test_hazard:
source_user = test_hazard
LOG.debug(f"identified hazard source {source_user}")
test_status = last_status_set.get((mon, source))
if test_status:
source_user = test_status
LOG.debug(f"identified move source {source_user}")
if source == "Recoil" or source.startswith("item: "):
LOG.debug(f"identified special source {source}")
source = source.replace("item: ", "")
source_user = "self"
if not source_user:
LOG.error(f"missing source for {line}")
continue
player, pkmn = resolve_mon(mon)
if source_user.startswith("p1") or source_user.startswith("p2"):
source_player, source_mon = resolve_mon(source_user)
else:
source_player = None
if source_player:
conn.execute(
""" """
INSERT INTO knockouts(game, turn, player, pokemon) INSERT INTO indirect_damage(game, player, name, value)
VALUES(?, ?, ?, ?) VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""", """,
(self.game, self.turn, team(player), self.specie(pokemon)), (game, team(source_player), source_mon, hp[mon] - new_hp),
) )
# t.Literal, TaggedPokemon, str, TaggedPokemon if status == "0 fnt":
case ["move", user_, move, target_]: conn.execute(
user = TaggedPokemon(user_)
target = TaggedPokemon(target_)
last_move = (user, target, move)
player, _ = self.split_pokemon(user)
self.conn.execute(
""" """
INSERT INTO moves(game, turn, player, pokemon, move, target) INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
VALUES (?, ?, ?, ?, ?, ?) VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
( (
self.game, game,
self.turn, turn,
team(player), team(player),
self.specie(user), pkmn,
move, source,
self.specie(target), source_mon,
),
)
# t.Literal, PlayerTag, Player
case ["player", id, username, *rest]:
self.players[PlayerTag(id)] = Player(username)
# t.Literal, TaggedPokemon, str
case ["replace", name, specie]:
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
case ["switch", name, specie, status, *rest]:
self.hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie.split(", ")[0]
player, nickname = self.split_pokemon(name)
self._log_appearance(name, specie)
self.conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), trimmed_specie),
)
# t.Literal, str
case ["turn", turn]:
self.turn = int(turn)
# t.Literal, Player
case ["win", player]:
self.conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), self.game),
)
case ["-heal", pokemon, status, *rest]:
self.hp[pokemon] = int(status.split("/")[0])
# TODO: track healing done
# t.Literal, TaggedPokemon, str
case ["-damage", pokemon, status]:
# Pokemon takes direct (non-hazard/condition) damage; status
# can be a percentage "70/100" with or without condition, or
# "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
f"{pokemon} took direct damage but last move"
f" {last_move[2]} was not targeted at them"
)
continue
damage_source = last_move[0]
source_player, source_nickname = self.split_pokemon(damage_source)
self.conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
team(source_player), team(source_player),
self.specie(damage_source),
self.hp[pokemon] - new_hp,
), ),
) )
self.hp[pokemon] = new_hp case ["-heal", mon, status, *rest]:
hp[mon] = int(status.split("/")[0])
# t.Literal, TaggedPokemon, str, str case _:
case ["-damage", pokemon_, status, from_]: # LOG.debug(f"unhandled message {chunks[0]}")
pokemon = TaggedPokemon(pokemon_) pass
# Pokemon takes indirect damage; status can be a percentage
# "70/100" with or without condition, or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
source: TaggedPokemon | str | None = None
source_is_pokemon = True
test_hazard = self.last_env_set.get((tag(pokemon), reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = self.last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = self.split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = self.split_pokemon(TaggedPokemon(source))
else:
source_player = None # type: ignore
source_is_pokemon = False
if source_player:
self.conn.execute(
"""
INSERT INTO indirect_damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
""",
(
self.game,
team(source_player),
self.specie(TaggedPokemon(source)),
self.hp[pokemon] - new_hp,
),
)
if status == "0 fnt":
self.conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
self.turn,
team(player),
self.specie(pokemon),
reason,
self.specie(TaggedPokemon(source))
if source_is_pokemon
else source,
team(source_player),
),
)
# t.Literal, TaggedPlayer, str
case ["-sidestart", side_, env]:
side = TaggedPlayer(side_)
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_env_set[
(tag(side), env.replace("move: ", ""))
] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-status", pokemon_, cond]:
pokemon = TaggedPokemon(pokemon_)
if not last_move or last_move[1] != pokemon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_status_set[(pokemon, cond)] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-terastallize", pokemon_, type]:
pokemon = TaggedPokemon(pokemon_)
# TODO
pass
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True) @dataclass(frozen=True)
@ -490,7 +355,7 @@ def fetch(replay: str, cache: bool = True) -> Replay:
with replay_file.open(mode="w") as f: with replay_file.open(mode="w") as f:
json.dump(data, f) json.dump(data, f)
return Replay(**data) # type: ignore return Replay(**data)
def main(args): def main(args):
@ -562,7 +427,7 @@ def main(args):
), ),
) )
LogParser(replay.id, db).parse(replay.log) parse_log(replay.id, replay.log, into=db)
db.commit() db.commit()
finally: finally: