Compare commits

..

11 Commits

Author SHA1 Message Date
e421bd1de9 log complete update 2023-04-07 10:13:55 +10:00
132096b427 fix move command 2023-04-07 10:13:54 +10:00
c3df316aa4 add bot 2023-04-07 10:01:57 +10:00
008ddf6298 move main file 2023-04-07 09:16:15 +10:00
145f220a07 mark todo sections 2023-04-04 22:50:13 +10:00
a839460e9a optimise missing team logging to use a set 2023-04-04 22:44:46 +10:00
8f8b96937d add missing clears 2023-04-04 22:44:05 +10:00
87114b1d1d add types to move tracking 2023-04-04 22:44:05 +10:00
e8149e7c3b reorder match blocks by alphabetical 2023-04-04 22:32:47 +10:00
37a2df3039 refactor parser into class; add newtyping 2023-04-04 22:21:33 +10:00
8a348eca84 mass refactor
- resolve nicknames as we go
- update column names
- better variables
2023-04-04 21:03:41 +10:00
5 changed files with 366 additions and 300 deletions

64
bot.py
View File

@@ -1,21 +1,18 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from discord.utils import setup_logging from discord.utils import setup_logging
from typing import Optional
import argparse import argparse
import discord import discord
import logging import logging
import os.path
import random
import re import re
import shutil import shutil
import subprocess as sp import subprocess as sp
discord.utils.setup_logging() discord.utils.setup_logging()
_log = logging.getLogger(__name__) _log = logging.getLogger("statbot")
_GAMES = "games.txt" _GAMES = "games.txt"
_DB = "holy-heck.db" _DB = "holy-heck2.db"
_DB_DEST = f"/var/lib/grafana/{_DB}" _DB_DEST = f"/var/lib/grafana/{_DB}"
@@ -42,51 +39,16 @@ def _update_db():
class BotClient(discord.Client): class BotClient(discord.Client):
def __init__(
self,
intents: discord.Intents,
leaguefacts: Optional[list[str]] = None,
):
super().__init__(intents=intents)
self._leaguefacts = leaguefacts or []
async def on_ready(self): async def on_ready(self):
_log.info(f"ready as {self.user}") _log.info(f"ready as {self.user}")
async def on_message(self, message: discord.Message): async def on_message(self, message: discord.Message):
content = message.content content = message.content
if self.is_replay(message): if re.match("https://replay.pokemonshowdown.com/dl-.*", content):
await self.on_replay(message) _log.info(f"Recognised {content} as a League game")
elif self.is_leaguefact(message): _write_game(content)
await self.on_leaguefact(message)
def is_replay(self, message: discord.Message) -> bool:
if re.match("https://replay.pokemonshowdown.com/dl-.*", message.content):
return True
return False
async def on_replay(self, message: discord.Message):
_log.info(f"Recognised {message.content} as a League game")
_write_game(message.content)
_update_db() _update_db()
def is_leaguefact(self, message: discord.Message) -> bool:
return message.content.lower() == "leaguefacts"
async def on_leaguefact(self, message: discord.Message):
_log.info("leaguefact requested")
fact = self._select_leaguefact()
if fact:
await message.channel.send(f"Did you know? {fact}")
else:
await message.channel.send("There are no league facts.")
def _select_leaguefact(self) -> Optional[str]:
if not self._leaguefacts:
return None
random.seed()
return random.choice(self._leaguefacts)
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@@ -97,26 +59,12 @@ def main():
default="token", default="token",
help="file containing Discord API token", help="file containing Discord API token",
) )
parser.add_argument(
"-f",
"--facts",
metavar="FILE",
default="facts.txt",
help="file containing leagefacts",
)
args = parser.parse_args() args = parser.parse_args()
facts = []
if os.path.exists(args.facts):
with open(args.facts) as f:
for line in f:
facts.append(line)
intents = discord.Intents.default() intents = discord.Intents.default()
intents.message_content = True intents.message_content = True
client = BotClient(leaguefacts=facts, intents=intents) client = BotClient(intents=intents)
with open(args.token_file) as f: with open(args.token_file) as f:
token = f.read().strip() token = f.read().strip()
client.run(token, log_handler=None) client.run(token, log_handler=None)

View File

@@ -1,21 +0,0 @@
The Canberra Mamoswines have fielded the most unique Pokémon, with 13.
The Raleigh County Revavrooms have drafted the most unique Pokémon, with 18.
Barrow's Biggest Birds have made the most free agency exchanges so far, with 2 exchanges totalling 6 members.
Despite their name, Barrow's Biggest Birds do not currently possess any of largest birds in season 3, and have only had at most three birds on their roster. They briefly had the tallest drafted bird, Quaquaval, but exchanged them in week 3.
The Raleigh County Revavrooms conducted the single largest free agency exchange in week 3, replacing eight members of their roster.
The Canberra Mamoswines are the slowest team, with an average of 64.5 Speed across their roster. The Buenos Aires Aggrons are the fastest, with an average of 84.6 Speed.
The Nimbasa City Ninjasks have the greatest Speed differential between their fastest and slowest members, of 112 points. Emily's Eevees and the Raleigh County Revavrooms are tied for the lowest differential, of 72 points.
The Buenos Aires Aggrons have largest roster, with 14 heads across their 10 members. However, the Raleigh County Revavrooms have the most bodies, with up to 13.
Emily's Eevees have the fewest limbs in season 3, with only 24. Only two of their members are in the Amporphous egg group.
The Nimbasa City Ninjasks and the Canberra Mamoswines are tied for the most toxic rosters, with 2 Pokémon each capable of learning the move.
Cerluedge has more kills than all other Fire-type Pokémon combined that aren't some kind of Volcarona, as befitting of its ability, Flash Fire.
Landorus is the only Force of Nature that has not been drafted in season 3.
Emily's Eevees is the only roster to not have a Pokémon with more than 100 Speed.
Most teams have an average member cost of 12 points. The San Francisco 549ers are most unevenly weighted team, with a standard distribution of 6.99 points. The Raleigh County Revavrooms are the most evenly weighted, with a standard distribution of 5.29 points.
Only one team does not have a regional or alternate variant of a Pokémon on their roster: the Nimbasa City Ninjasks.
The East Midland Milotics have the fattest roster, claiming both Dondozo and Bellibolt.
Three teams have more than one 2-cost Pokémon in their rosters: Barrow's Biggest Birds, Mew York, and the Nimbasa City Ninjasks, each with 2 members. Five teams have no 2-cost Pokémon at all.
Prior to week 6, the San Francisco 549ers had the smallest Paldean representation in season 3, with only one team member.
Moltres is the only legendary bird that has not been drafted in season 3.
Tyranitar, Metagross, and Kommo-o are the only pseudo-legendary Pokémon to not be drafted (either directly or as a variant) in season 3. Of these, only Metagross and Kommo-o are not available to be drafted.
Frosmoth is the biggest underdog in season 3 so far, having 4 knockouts to its name while only being a 2-point Pokémon.

View File

@@ -14,10 +14,17 @@
let let
python = pkgs.python3.withPackages (ps: [ python = pkgs.python3.withPackages (ps: [
ps.discordpy ps.discordpy
ps.mypy
ps.requests ps.requests
ps.types-requests
]); ]);
in in
[ python pkgs.sqlite ]; [
python
pkgs.sqlite
python.pkgs.python-lsp-server
python.pkgs.pylsp-mypy
];
}; };
}); });
} }

View File

@@ -18,6 +18,3 @@ https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33751
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33998 https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33998
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34087 https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34087
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34672 https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34672
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-36025
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-36096
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-37058

461
index.py
View File

@@ -14,22 +14,22 @@ import sys
import typing as t import typing as t
logging.TRACE = 5 logging.TRACE = 5 # type: ignore
logging.addLevelName(logging.TRACE, "TRACE") logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
logging.trace = partial(logging.log, logging.TRACE) logging.trace = partial(logging.log, logging.TRACE) # type: ignore
class LogFormatter(logging.Formatter): class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s" _format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = { FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m", logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m", logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m", logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m", logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m", logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m", logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
} }
def format(self, record): def format(self, record):
@@ -45,29 +45,6 @@ _ch.setFormatter(LogFormatter())
LOG.addHandler(_ch) LOG.addHandler(_ch)
TEAMS = {}
_logged_teams = []
def team(player: str) -> str:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection): def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row): def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description] fields = [column[0] for column in cursor.description]
@@ -79,23 +56,24 @@ def _init_db(conn: sqlite3.Connection):
conn.executescript( conn.executescript(
""" """
CREATE TABLE IF NOT EXISTS moves( CREATE TABLE IF NOT EXISTS moves(
game, turn, player, name, user, target, game, turn, player, pokemon, move, target,
UNIQUE(game, turn, player, user) UNIQUE(game, turn, player, pokemon)
); );
CREATE TABLE IF NOT EXISTS switches( CREATE TABLE IF NOT EXISTS switches(
game, turn, player, name, game, turn, player, pokemon,
UNIQUE(game, turn, player, name) UNIQUE(game, turn, player, pokemon)
); );
CREATE TABLE IF NOT EXISTS nicknames( CREATE TABLE IF NOT EXISTS nicknames(
game, player, name, specie, game, player, pokemon, specie,
UNIQUE(game, player, specie) UNIQUE(game, player, specie)
); );
CREATE TABLE IF NOT EXISTS knockouts( CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, name, game, turn, player, pokemon,
UNIQUE(game, turn, player) UNIQUE(game, turn, player)
); );
CREATE TABLE IF NOT EXISTS indirect_knockouts( CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, name, source, source_user, source_player, game, turn, player, pokemon,
reason, source, source_player,
UNIQUE(game, turn, player) UNIQUE(game, turn, player)
); );
CREATE TABLE IF NOT EXISTS games( CREATE TABLE IF NOT EXISTS games(
@@ -104,216 +82,373 @@ def _init_db(conn: sqlite3.Connection):
); );
-- No good way to ensure idempotence for damage; just re-build it. -- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage; DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, name, value); CREATE TABLE damage(game, player, pokemon, value);
DROP TABLE IF EXISTS indirect_damage; DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, name, value); CREATE TABLE indirect_damage(game, player, pokemon, value);
""" """
) )
def parse_log(game: str, log: str, into: sqlite3.Connection): # Either the value "p1" or "p2"
conn = into PlayerTag = t.NewType("PlayerTag", str)
# A player's name
Player = t.NewType("Player", str)
# A player prefixed with a PlayerTag
TaggedPlayer = t.NewType("TaggedPlayer", str)
# A Pokemon identified by its nickname, if any
Pokemon = t.NewType("Pokemon", str)
# A Pokemon specie
PokemonSpecie = t.NewType("PokemonSpecie", str)
# A Pokemon prefixed with a PlayerTag
TaggedPokemon = t.NewType("TaggedPokemon", str)
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
return PlayerTag(tagged[0:1])
TEAMS: dict[Player, Player] = {}
_logged_teams: set[Player] = set()
def team(player: Player) -> Player:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.add(player)
return player
class LogParser:
turn = 0 turn = 0
players = {} players: dict[PlayerTag, Player] = {}
hp = {} hp: dict[TaggedPokemon, int] = {}
# ("p2a: Edward", "p1a: Meteo") # Memorises the user of the move that causes environment setting or status,
# memorises the user of the move that causes environment setting or status, # its target, and the move name (for debugging).
# and its target last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas" # Memorises the last hazard set against a player and the causing user.
last_env_set: dict[tuple[str, str], str] = {} last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward" # Memorises statuses set on a pokemon and the causing user.
last_status_set: dict[tuple[str, str], str] = {} last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {}
def resolve_mon(user: str) -> tuple[str, str]: def __init__(self, game: str, into: sqlite3.Connection):
[player, name] = user.split(": ") self.game = game
return players[player.strip("ab")], name self.conn: sqlite3.Connection = into
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
"""Splits a TaggedPokemon into the owning player and the Pokemon."""
[player, pokemon] = user.split(": ")
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
@t.overload
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
"""Resolves the species of a nicknamed Pokemon."""
...
@t.overload
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
"""Resolves the species of a Pokemon given its Showdown identifier (used
in split_pokemon)."""
...
def specie(
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
) -> PokemonSpecie:
if not player:
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
return (
self.conn.execute(
"""
SELECT specie
FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1
""",
(self.game, team(player), pokemon),
)
.fetchall()[0]
.specie
)
def _reset(self):
self.turn = 0
self.players.clear()
self.hp.clear()
self.last_move = None
self.last_env_set.clear()
self.last_status_set.clear()
def _log_appearance(self, name: TaggedPokemon, specie: str):
# Also includes gender and formes.
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
player, nickname = self.split_pokemon(name)
self.conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, team(player), nickname, trimmed_specie),
)
def parse(self, log: str):
self._reset()
for line in log.split("\n"): for line in log.split("\n"):
chunks = line.split("|")[1:] chunks = line.split("|")[1:]
if not chunks: if not chunks:
continue continue
LOG.trace(line) LOG.trace(line) # type: ignore
match chunks: match chunks:
case ["player", id, username, *rest]:
players[id] = username
case ["turn", turn]: # t.Literal, TaggedPokemon, str, str
turn = int(turn) case ["drag", name_, specie, status, *rest]:
name = TaggedPokemon(name_)
case ["move", user, move, target]: self.hp[name] = int(status.split("/")[0])
last_move = (user, target) self._log_appearance(name, specie)
player, user = resolve_mon(user)
_, target = resolve_mon(target) # t.Literal, TaggedPokemon
conn.execute( case ["faint", pokemon_]:
pokemon = TaggedPokemon(pokemon_)
player, _ = self.split_pokemon(pokemon)
self.conn.execute(
""" """
INSERT INTO moves(game, turn, player, name, user, target) INSERT INTO knockouts(game, turn, player, pokemon)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), self.specie(pokemon)),
)
# t.Literal, TaggedPokemon, str, TaggedPokemon
case ["move", user_, move, target_]:
user = TaggedPokemon(user_)
target = TaggedPokemon(target_)
last_move = (user, target, move)
player, _ = self.split_pokemon(user)
self.conn.execute(
"""
INSERT INTO moves(game, turn, player, pokemon, move, target)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
(game, turn, team(player), move, user, target), (
self.game,
self.turn,
team(player),
self.specie(user),
move,
self.specie(target),
),
) )
case ["drag", name, specie, status, *rest]: # t.Literal, PlayerTag, Player
hp[name] = int(status.split("/")[0]) case ["player", id, username, *rest]:
self.players[PlayerTag(id)] = Player(username)
# t.Literal, TaggedPokemon, str
case ["replace", name, specie]:
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
case ["switch", name, specie, status, *rest]: case ["switch", name, specie, status, *rest]:
hp[name] = int(status.split("/")[0]) self.hp[name] = int(status.split("/")[0])
player, name = resolve_mon(name) # Also includes gender and formes.
conn.execute( trimmed_specie = specie.split(", ")[0]
player, nickname = self.split_pokemon(name)
self._log_appearance(name, specie)
self.conn.execute(
""" """
INSERT INTO switches(game, turn, player, name) INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
(game, turn, team(player), name), (self.game, self.turn, team(player), trimmed_specie),
)
conn.execute(
"""
INSERT INTO nicknames(game, player, name, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), name, specie.split(", ")[0]),
) )
case ["faint", mon]: # t.Literal, str
player, mon = resolve_mon(mon) case ["turn", turn]:
conn.execute( self.turn = int(turn)
"""
INSERT INTO knockouts(game, turn, player, name)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), mon),
)
# t.Literal, Player
case ["win", player]: case ["win", player]:
conn.execute( self.conn.execute(
""" """
UPDATE games UPDATE games
SET winner = ? SET winner = ?
WHERE id = ? WHERE id = ?
""", """,
(team(player), game), (team(player), self.game),
) )
case ["-sidestart", side, env]: case ["-heal", pokemon, status, *rest]:
if not last_move: self.hp[pokemon] = int(status.split("/")[0])
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]: # TODO: track healing done
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", mon, status]: # t.Literal, TaggedPokemon, str
# mon takes direct (non-hazard/condition) damage case ["-damage", pokemon, status]:
# status can be a percentage 70/100 with or without condition, # Pokemon takes direct (non-hazard/condition) damage; status
# or "0 fnt" # can be a percentage "70/100" with or without condition, or
# "0 fnt"
new_hp = int(re.split("[/ ]", status)[0]) new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {hp[mon]}") LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
LOG.debug(f"source: {last_move}") LOG.debug(f"source: {last_move}")
# resolve to damage source # resolve to damage source
if last_move[1] != mon: if last_move[1] != pokemon:
LOG.warn( LOG.warning(
f"{mon} took direct damage but last move was not targeted at them" f"{pokemon} took direct damage but last move"
f" {last_move[2]} was not targeted at them"
) )
continue continue
user = last_move[0] damage_source = last_move[0]
source_player, source_mon = resolve_mon(user) source_player, source_nickname = self.split_pokemon(damage_source)
conn.execute( self.conn.execute(
""" """
INSERT INTO damage(game, player, name, value) INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?) VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
(game, team(source_player), source_mon, hp[mon] - new_hp), (
self.game,
team(source_player),
self.specie(damage_source),
self.hp[pokemon] - new_hp,
),
) )
hp[mon] = new_hp self.hp[pokemon] = new_hp
case ["-damage", mon, status, from_]: # t.Literal, TaggedPokemon, str, str
# mon takes indirect damage case ["-damage", pokemon_, status, from_]:
# status can be a percentage 70/100 with or without condition, pokemon = TaggedPokemon(pokemon_)
# or "0 fnt"
# mon has fainted from an indirect damage source # Pokemon takes indirect damage; status can be a percentage
# # "70/100" with or without condition, or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0]) new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {from_}") LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing source for {line}") LOG.debug(f"tracing reason for {line}")
source = from_.replace("[from] ", "") reason = from_.replace("[from] ", "")
source_user = None
test_hazard = last_env_set.get((mon[0:1], source)) source: TaggedPokemon | str | None = None
source_is_pokemon = True
test_hazard = self.last_env_set.get((tag(pokemon), reason))
if test_hazard: if test_hazard:
source_user = test_hazard source = test_hazard
LOG.debug(f"identified hazard source {source_user}") LOG.debug(f"identified hazard source {source}")
test_status = last_status_set.get((mon, source)) test_status = self.last_status_set.get((pokemon, reason))
if test_status: if test_status:
source_user = test_status source = test_status
LOG.debug(f"identified move source {source_user}") LOG.debug(f"identified move source {source}")
if source == "Recoil" or source.startswith("item: "): if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {source}") LOG.debug(f"identified special source {reason}")
source = source.replace("item: ", "") reason = reason.replace("item: ", "")
source_user = "self" source = "self"
source_is_pokemon = False
if not source_user: if not source:
LOG.error(f"missing source for {line}") LOG.error(f"missing reason for {line}")
continue continue
player, pkmn = resolve_mon(mon) player, nickname = self.split_pokemon(pokemon)
if source_user.startswith("p1") or source_user.startswith("p2"): if source.startswith("p1") or source.startswith("p2"):
source_player, source_mon = resolve_mon(source_user) source_player, _ = self.split_pokemon(TaggedPokemon(source))
else: else:
source_player = None source_player = None # type: ignore
source_is_pokemon = False
if source_player: if source_player:
conn.execute( self.conn.execute(
""" """
INSERT INTO indirect_damage(game, player, name, value) INSERT INTO indirect_damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?) VALUES(?, ?, ?, ?)
""", """,
(game, team(source_player), source_mon, hp[mon] - new_hp), (
self.game,
team(source_player),
self.specie(TaggedPokemon(source)),
self.hp[pokemon] - new_hp,
),
) )
if status == "0 fnt": if status == "0 fnt":
conn.execute( self.conn.execute(
""" """
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player) INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?) VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""", """,
( (
game, self.game,
turn, self.turn,
team(player), team(player),
pkmn, self.specie(pokemon),
source, reason,
source_mon, self.specie(TaggedPokemon(source))
if source_is_pokemon
else source,
team(source_player), team(source_player),
), ),
) )
case ["-heal", mon, status, *rest]: # t.Literal, TaggedPlayer, str
hp[mon] = int(status.split("/")[0]) case ["-sidestart", side_, env]:
side = TaggedPlayer(side_)
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_env_set[
(tag(side), env.replace("move: ", ""))
] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-status", pokemon_, cond]:
pokemon = TaggedPokemon(pokemon_)
if not last_move or last_move[1] != pokemon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_status_set[(pokemon, cond)] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-terastallize", pokemon_, type]:
pokemon = TaggedPokemon(pokemon_)
# TODO
pass
case _: case _:
# LOG.debug(f"unhandled message {chunks[0]}") # LOG.debug(f"unhandled message {chunks[0]}")
@@ -355,7 +490,7 @@ def fetch(replay: str, cache: bool = True) -> Replay:
with replay_file.open(mode="w") as f: with replay_file.open(mode="w") as f:
json.dump(data, f) json.dump(data, f)
return Replay(**data) return Replay(**data) # type: ignore
def main(): def main():
@@ -427,7 +562,7 @@ def main():
), ),
) )
parse_log(replay.id, replay.log, into=db) LogParser(replay.id, db).parse(replay.log)
db.commit() db.commit()
finally: finally: