Compare commits
7 Commits
trunk
...
fd4ac1d544
| Author | SHA1 | Date | |
|---|---|---|---|
|
fd4ac1d544
|
|||
|
73a31d7b99
|
|||
|
a8730ed579
|
|||
|
dfc1093b39
|
|||
|
29df16ecd3
|
|||
|
6b36720c37
|
|||
|
39f412c0fe
|
63
bot.py
63
bot.py
@@ -1,15 +1,18 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from discord.utils import setup_logging
|
from discord.utils import setup_logging
|
||||||
|
from typing import Optional
|
||||||
import argparse
|
import argparse
|
||||||
import discord
|
import discord
|
||||||
import logging
|
import logging
|
||||||
|
import os.path
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess as sp
|
import subprocess as sp
|
||||||
|
|
||||||
discord.utils.setup_logging()
|
discord.utils.setup_logging()
|
||||||
_log = logging.getLogger("statbot")
|
_log = logging.getLogger(__name__)
|
||||||
|
|
||||||
_GAMES = "games.txt"
|
_GAMES = "games.txt"
|
||||||
_DB = "holy-heck2.db"
|
_DB = "holy-heck2.db"
|
||||||
@@ -39,15 +42,49 @@ def _update_db():
|
|||||||
|
|
||||||
|
|
||||||
class BotClient(discord.Client):
|
class BotClient(discord.Client):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
intents: discord.Intents,
|
||||||
|
leaguefacts: Optional[list[str]] = None,
|
||||||
|
):
|
||||||
|
super().__init__(intents=intents)
|
||||||
|
self._leaguefacts = leaguefacts or []
|
||||||
|
|
||||||
async def on_ready(self):
|
async def on_ready(self):
|
||||||
_log.info(f"ready as {self.user}")
|
_log.info(f"ready as {self.user}")
|
||||||
|
|
||||||
async def on_message(self, message: discord.Message):
|
async def on_message(self, message: discord.Message):
|
||||||
content = message.content
|
content = message.content
|
||||||
if re.match("https://replay.pokemonshowdown.com/dl-.*", content):
|
if self.is_replay(message):
|
||||||
_log.info(f"Recognised {content} as a League game")
|
await self.on_replay(message)
|
||||||
_write_game(content)
|
elif self.is_leaguefact(message):
|
||||||
_update_db()
|
await self.on_leaguefact(message)
|
||||||
|
|
||||||
|
def is_replay(self, message: discord.Message) -> bool:
|
||||||
|
if re.match("https://replay.pokemonshowdown.com/dl-.*", message.content):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def on_replay(self, message: discord.Message):
|
||||||
|
_log.info(f"Recognised {message.content} as a League game")
|
||||||
|
_write_game(message.content)
|
||||||
|
_update_db()
|
||||||
|
|
||||||
|
def is_leaguefact(self, message: discord.Message) -> bool:
|
||||||
|
return message.content.lower() == "notleaguefact"
|
||||||
|
|
||||||
|
async def on_leaguefact(self, message: discord.Message):
|
||||||
|
fact = self._select_leaguefact()
|
||||||
|
if fact:
|
||||||
|
await message.channel.send(f"Did you know? {fact}")
|
||||||
|
else:
|
||||||
|
await message.channel.send("There are no league facts.")
|
||||||
|
|
||||||
|
def _select_leaguefact(self) -> Optional[str]:
|
||||||
|
if not self._leaguefacts:
|
||||||
|
return None
|
||||||
|
random.seed()
|
||||||
|
return random.choice(self._leaguefacts)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -59,12 +96,26 @@ def main():
|
|||||||
default="token",
|
default="token",
|
||||||
help="file containing Discord API token",
|
help="file containing Discord API token",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-f",
|
||||||
|
"--facts",
|
||||||
|
metavar="FILE",
|
||||||
|
default="facts.txt",
|
||||||
|
help="file containing leagefacts",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
facts = []
|
||||||
|
if os.path.exists(args.facts):
|
||||||
|
with open(args.facts) as f:
|
||||||
|
for line in f:
|
||||||
|
facts.append(line)
|
||||||
|
|
||||||
intents = discord.Intents.default()
|
intents = discord.Intents.default()
|
||||||
intents.message_content = True
|
intents.message_content = True
|
||||||
client = BotClient(intents=intents)
|
client = BotClient(leaguefacts=facts, intents=intents)
|
||||||
|
|
||||||
with open(args.token_file) as f:
|
with open(args.token_file) as f:
|
||||||
token = f.read().strip()
|
token = f.read().strip()
|
||||||
client.run(token, log_handler=None)
|
client.run(token, log_handler=None)
|
||||||
|
|||||||
21
facts.txt
Normal file
21
facts.txt
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
The Canberra Mamoswines have fielded the most unique Pokémon, with 13.
|
||||||
|
The Raleigh County Revavrooms have drafted the most unique Pokémon, with 18.
|
||||||
|
Barrow's Biggest Birds have made the most free agency exchanges so far, with 2 exchanges totalling 6 members.
|
||||||
|
Despite their name, Barrow's Biggest Birds do not currently possess any of largest birds in the league, and have only had at most three birds on their roster. They briefly had the tallest drafted bird, Quaquaval, but exchanged them in week 3.
|
||||||
|
The Raleigh County Revavrooms conducted the single largest free agency exchange in week 3, replacing eight members of their roster.
|
||||||
|
The Canberra Mamoswines are the slowest team, with an average of 64.5 Speed across their roster. The Raleigh County Revavrooms are the fastest, with an average of 83.3 Speed.
|
||||||
|
The Nimbasa City Ninjasks have the greatest Speed differential between their fastest and slowest members, of 112 points. Emily's Eevees and the Raleigh County Revavrooms are tied for the lowest differential, of 72 points.
|
||||||
|
The Buenos Aires Aggrons have largest roster, with 14 heads across their 10 members. However, the Raleigh County Revavrooms have the most bodies, with up to 13.
|
||||||
|
Emily's Eevees have the fewest limbs in the league, with only 24. Only two of their members are in the Amporphous egg group.
|
||||||
|
The Nimbasa City Ninjasks and the Canberra Mamoswines are tied for the most toxic rosters, with 2 Pokémon each capable of learning the move.
|
||||||
|
Cerluedge has more kills than all other Fire-type Pokémon combined that aren't some kind of Volcarona, as befitting of its ability, Flash Fire.
|
||||||
|
Landorus is the only Force of Nature that has not been drafted in season 3.
|
||||||
|
Emily's Eevees is the only roster to not have a Pokémon with more than 100 Speed.
|
||||||
|
Most teams have an average member cost of 12 points. Emily's Eevees and the Nimbasa City Ninjasks are tied for the most unevenly weighted teams, with a standard distribution of 6.87 points. The Raleigh County Revavrooms are the most evenly weighted, with a standard distribution of 5.29 points.
|
||||||
|
Only one team does not have a regional or alternate variant of a Pokémon on their roster: the Nimbasa City Ninjasks.
|
||||||
|
The East Midland Milotics have the fattest roster, claiming both Dondozo and Bellibolt.
|
||||||
|
Three teams have more than one 2-cost Pokémon in their rosters: Barrow's Biggest Birds, Mew York, and the Nimbasa City Ninjasks, each with 2 members. Five teams have no 2-cost Pokémon at all.
|
||||||
|
The San Francisco 549ers have the smallest Paldean representation in the league, with only one team member.
|
||||||
|
Moltres is the only legendary bird that has not been drafted in season 3.
|
||||||
|
Tyranitar, Metagross, and Kommo-o are the only pseudo-legendary Pokémon to not be drafted (either directly or as a variant) in season 3. Of these, only Metagross and Kommo-o are not available to be drafted.
|
||||||
|
Frosmoth is the biggest underdog in season 3 so far, having 4 knockouts to its name while only being a 2-point Pokémon.
|
||||||
@@ -14,17 +14,10 @@
|
|||||||
let
|
let
|
||||||
python = pkgs.python3.withPackages (ps: [
|
python = pkgs.python3.withPackages (ps: [
|
||||||
ps.discordpy
|
ps.discordpy
|
||||||
ps.mypy
|
|
||||||
ps.requests
|
ps.requests
|
||||||
ps.types-requests
|
|
||||||
]);
|
]);
|
||||||
in
|
in
|
||||||
[
|
[ python pkgs.sqlite ];
|
||||||
python
|
|
||||||
pkgs.sqlite
|
|
||||||
python.pkgs.python-lsp-server
|
|
||||||
python.pkgs.pylsp-mypy
|
|
||||||
];
|
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,3 +18,5 @@ https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33751
|
|||||||
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33998
|
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-33998
|
||||||
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34087
|
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34087
|
||||||
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34672
|
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-34672
|
||||||
|
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-36025
|
||||||
|
https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-36096
|
||||||
|
|||||||
565
index.py
565
index.py
@@ -14,22 +14,22 @@ import sys
|
|||||||
import typing as t
|
import typing as t
|
||||||
|
|
||||||
|
|
||||||
logging.TRACE = 5 # type: ignore
|
logging.TRACE = 5
|
||||||
logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
|
logging.addLevelName(logging.TRACE, "TRACE")
|
||||||
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
|
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
|
||||||
logging.trace = partial(logging.log, logging.TRACE) # type: ignore
|
logging.trace = partial(logging.log, logging.TRACE)
|
||||||
|
|
||||||
|
|
||||||
class LogFormatter(logging.Formatter):
|
class LogFormatter(logging.Formatter):
|
||||||
|
|
||||||
_format = "%(name)s [%(levelname)s] %(message)s"
|
format = "%(name)s [%(levelname)s] %(message)s"
|
||||||
FORMATS = {
|
FORMATS = {
|
||||||
logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
|
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
|
||||||
logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
|
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
|
||||||
logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
|
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
|
||||||
logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
|
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
|
||||||
logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
|
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
|
||||||
logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
|
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
|
||||||
}
|
}
|
||||||
|
|
||||||
def format(self, record):
|
def format(self, record):
|
||||||
@@ -45,6 +45,29 @@ _ch.setFormatter(LogFormatter())
|
|||||||
LOG.addHandler(_ch)
|
LOG.addHandler(_ch)
|
||||||
|
|
||||||
|
|
||||||
|
TEAMS = {}
|
||||||
|
_logged_teams = []
|
||||||
|
|
||||||
|
|
||||||
|
def team(player: str) -> str:
|
||||||
|
"""Maps a username to a defined team."""
|
||||||
|
if player in TEAMS:
|
||||||
|
return TEAMS[player]
|
||||||
|
else:
|
||||||
|
if not player in _logged_teams and player:
|
||||||
|
LOG.warning(f"missing team mapping for {player}")
|
||||||
|
_logged_teams.append(player)
|
||||||
|
return player
|
||||||
|
|
||||||
|
|
||||||
|
class safelist(list):
|
||||||
|
def get(self, index, default=None):
|
||||||
|
try:
|
||||||
|
return self.__getitem__(index)
|
||||||
|
except IndexError:
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
def _init_db(conn: sqlite3.Connection):
|
def _init_db(conn: sqlite3.Connection):
|
||||||
def namedtuple_factory(cursor, row):
|
def namedtuple_factory(cursor, row):
|
||||||
fields = [column[0] for column in cursor.description]
|
fields = [column[0] for column in cursor.description]
|
||||||
@@ -56,24 +79,23 @@ def _init_db(conn: sqlite3.Connection):
|
|||||||
conn.executescript(
|
conn.executescript(
|
||||||
"""
|
"""
|
||||||
CREATE TABLE IF NOT EXISTS moves(
|
CREATE TABLE IF NOT EXISTS moves(
|
||||||
game, turn, player, pokemon, move, target,
|
game, turn, player, name, user, target,
|
||||||
UNIQUE(game, turn, player, pokemon)
|
UNIQUE(game, turn, player, user)
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS switches(
|
CREATE TABLE IF NOT EXISTS switches(
|
||||||
game, turn, player, pokemon,
|
game, turn, player, name,
|
||||||
UNIQUE(game, turn, player, pokemon)
|
UNIQUE(game, turn, player, name)
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS nicknames(
|
CREATE TABLE IF NOT EXISTS nicknames(
|
||||||
game, player, pokemon, specie,
|
game, player, name, specie,
|
||||||
UNIQUE(game, player, specie)
|
UNIQUE(game, player, specie)
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS knockouts(
|
CREATE TABLE IF NOT EXISTS knockouts(
|
||||||
game, turn, player, pokemon,
|
game, turn, player, name,
|
||||||
UNIQUE(game, turn, player)
|
UNIQUE(game, turn, player)
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS indirect_knockouts(
|
CREATE TABLE IF NOT EXISTS indirect_knockouts(
|
||||||
game, turn, player, pokemon,
|
game, turn, player, name, source, source_user, source_player,
|
||||||
reason, source, source_player,
|
|
||||||
UNIQUE(game, turn, player)
|
UNIQUE(game, turn, player)
|
||||||
);
|
);
|
||||||
CREATE TABLE IF NOT EXISTS games(
|
CREATE TABLE IF NOT EXISTS games(
|
||||||
@@ -82,377 +104,220 @@ def _init_db(conn: sqlite3.Connection):
|
|||||||
);
|
);
|
||||||
-- No good way to ensure idempotence for damage; just re-build it.
|
-- No good way to ensure idempotence for damage; just re-build it.
|
||||||
DROP TABLE IF EXISTS damage;
|
DROP TABLE IF EXISTS damage;
|
||||||
CREATE TABLE damage(game, player, pokemon, value);
|
CREATE TABLE damage(game, player, name, value);
|
||||||
DROP TABLE IF EXISTS indirect_damage;
|
DROP TABLE IF EXISTS indirect_damage;
|
||||||
CREATE TABLE indirect_damage(game, player, pokemon, value);
|
CREATE TABLE indirect_damage(game, player, name, value);
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Either the value "p1" or "p2"
|
def parse_log(game: str, log: str, into: sqlite3.Connection):
|
||||||
PlayerTag = t.NewType("PlayerTag", str)
|
conn = into
|
||||||
|
|
||||||
# A player's name
|
|
||||||
Player = t.NewType("Player", str)
|
|
||||||
|
|
||||||
# A player prefixed with a PlayerTag
|
|
||||||
TaggedPlayer = t.NewType("TaggedPlayer", str)
|
|
||||||
|
|
||||||
# A Pokemon identified by its nickname, if any
|
|
||||||
Pokemon = t.NewType("Pokemon", str)
|
|
||||||
|
|
||||||
# A Pokemon specie
|
|
||||||
PokemonSpecie = t.NewType("PokemonSpecie", str)
|
|
||||||
|
|
||||||
# A Pokemon prefixed with a PlayerTag
|
|
||||||
TaggedPokemon = t.NewType("TaggedPokemon", str)
|
|
||||||
|
|
||||||
|
|
||||||
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
|
|
||||||
return PlayerTag(tagged[0:1])
|
|
||||||
|
|
||||||
|
|
||||||
TEAMS: dict[Player, Player] = {}
|
|
||||||
_logged_teams: set[Player] = set()
|
|
||||||
|
|
||||||
|
|
||||||
def team(player: Player) -> Player:
|
|
||||||
"""Maps a username to a defined team."""
|
|
||||||
if player in TEAMS:
|
|
||||||
return TEAMS[player]
|
|
||||||
else:
|
|
||||||
if not player in _logged_teams and player:
|
|
||||||
LOG.warning(f"missing team mapping for {player}")
|
|
||||||
_logged_teams.add(player)
|
|
||||||
return player
|
|
||||||
|
|
||||||
|
|
||||||
class LogParser:
|
|
||||||
|
|
||||||
turn = 0
|
turn = 0
|
||||||
players: dict[PlayerTag, Player] = {}
|
players = {}
|
||||||
hp: dict[TaggedPokemon, int] = {}
|
hp = {}
|
||||||
|
|
||||||
# Memorises the user of the move that causes environment setting or status,
|
# ("p2a: Edward", "p1a: Meteo")
|
||||||
# its target, and the move name (for debugging).
|
# memorises the user of the move that causes environment setting or status,
|
||||||
last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None
|
# and its target
|
||||||
|
last_move: t.Optional[tuple[str, str]]
|
||||||
|
|
||||||
# Memorises the last hazard set against a player and the causing user.
|
# ("p1", "Spikes") => "p2a: Frosslas"
|
||||||
last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {}
|
last_env_set: dict[tuple[str, str], str] = {}
|
||||||
|
|
||||||
# Memorises statuses set on a pokemon and the causing user.
|
# ("p1a: Meteo", "brn") => "p2a: Edward"
|
||||||
last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {}
|
last_status_set: dict[tuple[str, str], str] = {}
|
||||||
|
|
||||||
def __init__(self, game: str, into: sqlite3.Connection):
|
def resolve_mon(user: str) -> tuple[str, str]:
|
||||||
self.game = game
|
[player, name] = user.split(": ")
|
||||||
self.conn: sqlite3.Connection = into
|
return players[player.strip("ab")], name
|
||||||
|
|
||||||
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
|
for line in log.split("\n"):
|
||||||
"""Splits a TaggedPokemon into the owning player and the Pokemon."""
|
chunks = line.split("|")[1:]
|
||||||
[player, pokemon] = user.split(": ")
|
if not chunks:
|
||||||
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
|
continue
|
||||||
|
|
||||||
@t.overload
|
LOG.trace(line)
|
||||||
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
|
|
||||||
"""Resolves the species of a nicknamed Pokemon."""
|
|
||||||
...
|
|
||||||
|
|
||||||
@t.overload
|
match chunks:
|
||||||
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
|
case ["player", id, username, *rest]:
|
||||||
"""Resolves the species of a Pokemon given its Showdown identifier (used
|
players[id] = username
|
||||||
in split_pokemon)."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def specie(
|
case ["turn", turn]:
|
||||||
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
|
turn = int(turn)
|
||||||
) -> PokemonSpecie:
|
|
||||||
if not player:
|
|
||||||
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
|
|
||||||
return (
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
SELECT specie
|
|
||||||
FROM nicknames
|
|
||||||
WHERE (game, player, pokemon) = (?, ?, ?)
|
|
||||||
LIMIT 1
|
|
||||||
""",
|
|
||||||
(self.game, team(player), pokemon),
|
|
||||||
)
|
|
||||||
.fetchall()[0]
|
|
||||||
.specie
|
|
||||||
)
|
|
||||||
|
|
||||||
def _reset(self):
|
case ["move", user, move, target]:
|
||||||
self.turn = 0
|
last_move = (user, target)
|
||||||
self.players.clear()
|
player, user = resolve_mon(user)
|
||||||
self.hp.clear()
|
_, target = resolve_mon(target)
|
||||||
self.last_move = None
|
conn.execute(
|
||||||
self.last_env_set.clear()
|
"""
|
||||||
self.last_status_set.clear()
|
INSERT INTO moves(game, turn, player, name, user, target)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(game, turn, team(player), move, user, target),
|
||||||
|
)
|
||||||
|
|
||||||
def _log_appearance(self, name: TaggedPokemon, specie: str):
|
case ["drag", name, specie, status, *rest]:
|
||||||
|
hp[name] = int(status.split("/")[0])
|
||||||
|
|
||||||
# Also includes gender and formes.
|
case ["switch", name, specie, status, *rest]:
|
||||||
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
|
hp[name] = int(status.split("/")[0])
|
||||||
player, nickname = self.split_pokemon(name)
|
|
||||||
|
|
||||||
self.conn.execute(
|
player, name = resolve_mon(name)
|
||||||
"""
|
conn.execute(
|
||||||
INSERT INTO nicknames(game, player, pokemon, specie)
|
"""
|
||||||
VALUES(?, ?, ?, ?)
|
INSERT INTO switches(game, turn, player, name)
|
||||||
ON CONFLICT DO NOTHING
|
VALUES (?, ?, ?, ?)
|
||||||
""",
|
ON CONFLICT DO NOTHING
|
||||||
(self.game, team(player), nickname, trimmed_specie),
|
""",
|
||||||
)
|
(game, turn, team(player), name),
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO nicknames(game, player, name, specie)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(game, team(player), name, specie.split(", ")[0]),
|
||||||
|
)
|
||||||
|
|
||||||
def parse(self, log: str):
|
case ["faint", mon]:
|
||||||
self._reset()
|
player, mon = resolve_mon(mon)
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO knockouts(game, turn, player, name)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(game, turn, team(player), mon),
|
||||||
|
)
|
||||||
|
|
||||||
for line in log.split("\n"):
|
case ["win", player]:
|
||||||
chunks = line.split("|")[1:]
|
conn.execute(
|
||||||
if not chunks:
|
"""
|
||||||
continue
|
UPDATE games
|
||||||
|
SET winner = ?
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(team(player), game),
|
||||||
|
)
|
||||||
|
|
||||||
LOG.trace(line) # type: ignore
|
case ["-sidestart", side, env]:
|
||||||
|
if not last_move:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
|
||||||
|
|
||||||
match chunks:
|
case ["-status", mon, cond]:
|
||||||
|
if not last_move or last_move[1] != mon:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
last_status_set[(mon, cond)] = last_move[0]
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str, str
|
case ["-damage", mon, status]:
|
||||||
case ["drag", name_, specie, status, *rest]:
|
# mon takes direct (non-hazard/condition) damage
|
||||||
name = TaggedPokemon(name_)
|
# status can be a percentage 70/100 with or without condition,
|
||||||
|
# or "0 fnt"
|
||||||
|
new_hp = int(re.split("[/ ]", status)[0])
|
||||||
|
LOG.debug(f"{mon} dropped to {new_hp} from {hp[mon]}")
|
||||||
|
LOG.debug(f"source: {last_move}")
|
||||||
|
|
||||||
self.hp[name] = int(status.split("/")[0])
|
# resolve to damage source
|
||||||
self._log_appearance(name, specie)
|
if last_move[1] != mon:
|
||||||
|
LOG.warn(
|
||||||
|
f"{mon} took direct damage but last move was not targeted at them"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
user = last_move[0]
|
||||||
|
source_player, source_mon = resolve_mon(user)
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon
|
conn.execute(
|
||||||
case ["faint", pokemon_]:
|
"""
|
||||||
pokemon = TaggedPokemon(pokemon_)
|
INSERT INTO damage(game, player, name, value)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(game, team(source_player), source_mon, hp[mon] - new_hp),
|
||||||
|
)
|
||||||
|
|
||||||
player, _ = self.split_pokemon(pokemon)
|
hp[mon] = new_hp
|
||||||
self.conn.execute(
|
|
||||||
|
case ["-damage", mon, status, from_]:
|
||||||
|
# mon takes indirect damage
|
||||||
|
# status can be a percentage 70/100 with or without condition,
|
||||||
|
# or "0 fnt"
|
||||||
|
# mon has fainted from an indirect damage source
|
||||||
|
#
|
||||||
|
new_hp = int(re.split("[/ ]", status)[0])
|
||||||
|
LOG.debug(f"{mon} dropped to {new_hp} from {from_}")
|
||||||
|
|
||||||
|
LOG.debug(f"tracing source for {line}")
|
||||||
|
source = from_.replace("[from] ", "")
|
||||||
|
source_user = None
|
||||||
|
|
||||||
|
test_hazard = last_env_set.get((mon[0:1], source))
|
||||||
|
if test_hazard:
|
||||||
|
source_user = test_hazard
|
||||||
|
LOG.debug(f"identified hazard source {source_user}")
|
||||||
|
|
||||||
|
test_status = last_status_set.get((mon, source))
|
||||||
|
if test_status:
|
||||||
|
source_user = test_status
|
||||||
|
LOG.debug(f"identified move source {source_user}")
|
||||||
|
|
||||||
|
if source == "Recoil" or source.startswith("item: "):
|
||||||
|
LOG.debug(f"identified special source {source}")
|
||||||
|
source = source.replace("item: ", "")
|
||||||
|
source_user = "self"
|
||||||
|
|
||||||
|
if not source_user:
|
||||||
|
LOG.error(f"missing source for {line}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
player, pkmn = resolve_mon(mon)
|
||||||
|
if source_user.startswith("p1") or source_user.startswith("p2"):
|
||||||
|
source_player, source_mon = resolve_mon(source_user)
|
||||||
|
else:
|
||||||
|
source_player = None
|
||||||
|
|
||||||
|
if source_player:
|
||||||
|
conn.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO knockouts(game, turn, player, pokemon)
|
INSERT INTO indirect_damage(game, player, name, value)
|
||||||
VALUES(?, ?, ?, ?)
|
VALUES(?, ?, ?, ?)
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
""",
|
||||||
(self.game, self.turn, team(player), self.specie(pokemon)),
|
(game, team(source_player), source_mon, hp[mon] - new_hp),
|
||||||
)
|
)
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str, TaggedPokemon
|
if status == "0 fnt":
|
||||||
case ["move", user_, move, target_]:
|
conn.execute(
|
||||||
user = TaggedPokemon(user_)
|
|
||||||
target = TaggedPokemon(target_)
|
|
||||||
|
|
||||||
last_move = (user, target, move)
|
|
||||||
player, _ = self.split_pokemon(user)
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
"""
|
||||||
INSERT INTO moves(game, turn, player, pokemon, move, target)
|
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
VALUES(?, ?, ?, ?, ?, ?, ?)
|
||||||
ON CONFLICT DO NOTHING
|
ON CONFLICT DO NOTHING
|
||||||
""",
|
""",
|
||||||
(
|
(
|
||||||
self.game,
|
game,
|
||||||
self.turn,
|
turn,
|
||||||
team(player),
|
team(player),
|
||||||
self.specie(user),
|
pkmn,
|
||||||
move,
|
source,
|
||||||
self.specie(target),
|
source_mon,
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# t.Literal, PlayerTag, Player
|
|
||||||
case ["player", id, username, *rest]:
|
|
||||||
self.players[PlayerTag(id)] = Player(username)
|
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str
|
|
||||||
case ["replace", name, specie]:
|
|
||||||
self._log_appearance(name, specie)
|
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
|
|
||||||
case ["switch", name, specie, status, *rest]:
|
|
||||||
self.hp[name] = int(status.split("/")[0])
|
|
||||||
|
|
||||||
# Also includes gender and formes.
|
|
||||||
trimmed_specie = specie.split(", ")[0]
|
|
||||||
player, nickname = self.split_pokemon(name)
|
|
||||||
|
|
||||||
self._log_appearance(name, specie)
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO switches(game, turn, player, pokemon)
|
|
||||||
VALUES (?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(self.game, self.turn, team(player), trimmed_specie),
|
|
||||||
)
|
|
||||||
|
|
||||||
# t.Literal, str
|
|
||||||
case ["turn", turn]:
|
|
||||||
self.turn = int(turn)
|
|
||||||
|
|
||||||
# t.Literal, Player
|
|
||||||
case ["win", player]:
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
UPDATE games
|
|
||||||
SET winner = ?
|
|
||||||
WHERE id = ?
|
|
||||||
""",
|
|
||||||
(team(player), self.game),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["-heal", pokemon, status, *rest]:
|
|
||||||
self.hp[pokemon] = int(status.split("/")[0])
|
|
||||||
|
|
||||||
# TODO: track healing done
|
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str
|
|
||||||
case ["-damage", pokemon, status]:
|
|
||||||
# Pokemon takes direct (non-hazard/condition) damage; status
|
|
||||||
# can be a percentage "70/100" with or without condition, or
|
|
||||||
# "0 fnt"
|
|
||||||
new_hp = int(re.split("[/ ]", status)[0])
|
|
||||||
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
|
|
||||||
LOG.debug(f"source: {last_move}")
|
|
||||||
|
|
||||||
# resolve to damage source
|
|
||||||
if last_move[1] != pokemon:
|
|
||||||
LOG.warning(
|
|
||||||
f"{pokemon} took direct damage but last move"
|
|
||||||
f" {last_move[2]} was not targeted at them"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
damage_source = last_move[0]
|
|
||||||
source_player, source_nickname = self.split_pokemon(damage_source)
|
|
||||||
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO damage(game, player, pokemon, value)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
self.game,
|
|
||||||
team(source_player),
|
team(source_player),
|
||||||
self.specie(damage_source),
|
|
||||||
self.hp[pokemon] - new_hp,
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
self.hp[pokemon] = new_hp
|
case ["-heal", mon, status, *rest]:
|
||||||
|
hp[mon] = int(status.split("/")[0])
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str, str
|
case _:
|
||||||
case ["-damage", pokemon_, status, from_]:
|
# LOG.debug(f"unhandled message {chunks[0]}")
|
||||||
pokemon = TaggedPokemon(pokemon_)
|
pass
|
||||||
|
|
||||||
# Pokemon takes indirect damage; status can be a percentage
|
|
||||||
# "70/100" with or without condition, or "0 fnt"
|
|
||||||
new_hp = int(re.split("[/ ]", status)[0])
|
|
||||||
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
|
|
||||||
|
|
||||||
LOG.debug(f"tracing reason for {line}")
|
|
||||||
reason = from_.replace("[from] ", "")
|
|
||||||
|
|
||||||
source: TaggedPokemon | str | None = None
|
|
||||||
source_is_pokemon = True
|
|
||||||
|
|
||||||
test_hazard = self.last_env_set.get((tag(pokemon), reason))
|
|
||||||
if test_hazard:
|
|
||||||
source = test_hazard
|
|
||||||
LOG.debug(f"identified hazard source {source}")
|
|
||||||
|
|
||||||
test_status = self.last_status_set.get((pokemon, reason))
|
|
||||||
if test_status:
|
|
||||||
source = test_status
|
|
||||||
LOG.debug(f"identified move source {source}")
|
|
||||||
|
|
||||||
if reason == "Recoil" or reason.startswith("item: "):
|
|
||||||
LOG.debug(f"identified special source {reason}")
|
|
||||||
reason = reason.replace("item: ", "")
|
|
||||||
source = "self"
|
|
||||||
source_is_pokemon = False
|
|
||||||
|
|
||||||
if not source:
|
|
||||||
LOG.error(f"missing reason for {line}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
player, nickname = self.split_pokemon(pokemon)
|
|
||||||
if source.startswith("p1") or source.startswith("p2"):
|
|
||||||
source_player, _ = self.split_pokemon(TaggedPokemon(source))
|
|
||||||
else:
|
|
||||||
source_player = None # type: ignore
|
|
||||||
source_is_pokemon = False
|
|
||||||
|
|
||||||
if source_player:
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO indirect_damage(game, player, pokemon, value)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
self.game,
|
|
||||||
team(source_player),
|
|
||||||
self.specie(TaggedPokemon(source)),
|
|
||||||
self.hp[pokemon] - new_hp,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
if status == "0 fnt":
|
|
||||||
self.conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO indirect_knockouts(
|
|
||||||
game, turn, player, pokemon,
|
|
||||||
reason, source, source_player)
|
|
||||||
VALUES(?, ?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
self.game,
|
|
||||||
self.turn,
|
|
||||||
team(player),
|
|
||||||
self.specie(pokemon),
|
|
||||||
reason,
|
|
||||||
self.specie(TaggedPokemon(source))
|
|
||||||
if source_is_pokemon
|
|
||||||
else source,
|
|
||||||
team(source_player),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# t.Literal, TaggedPlayer, str
|
|
||||||
case ["-sidestart", side_, env]:
|
|
||||||
side = TaggedPlayer(side_)
|
|
||||||
|
|
||||||
if not last_move:
|
|
||||||
LOG.warning(f"missing previous move for {line}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
LOG.debug(f"{line} <- {last_move}")
|
|
||||||
self.last_env_set[
|
|
||||||
(tag(side), env.replace("move: ", ""))
|
|
||||||
] = last_move[0]
|
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str
|
|
||||||
case ["-status", pokemon_, cond]:
|
|
||||||
pokemon = TaggedPokemon(pokemon_)
|
|
||||||
|
|
||||||
if not last_move or last_move[1] != pokemon:
|
|
||||||
LOG.warning(f"missing previous move for {line}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
LOG.debug(f"{line} <- {last_move}")
|
|
||||||
self.last_status_set[(pokemon, cond)] = last_move[0]
|
|
||||||
|
|
||||||
# t.Literal, TaggedPokemon, str
|
|
||||||
case ["-terastallize", pokemon_, type]:
|
|
||||||
pokemon = TaggedPokemon(pokemon_)
|
|
||||||
# TODO
|
|
||||||
pass
|
|
||||||
|
|
||||||
case _:
|
|
||||||
# LOG.debug(f"unhandled message {chunks[0]}")
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@@ -490,7 +355,7 @@ def fetch(replay: str, cache: bool = True) -> Replay:
|
|||||||
with replay_file.open(mode="w") as f:
|
with replay_file.open(mode="w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
|
|
||||||
return Replay(**data) # type: ignore
|
return Replay(**data)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -562,7 +427,7 @@ def main():
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
LogParser(replay.id, db).parse(replay.log)
|
parse_log(replay.id, replay.log, into=db)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
Reference in New Issue
Block a user