Compare commits

...

11 Commits

Author SHA1 Message Date
e421bd1de9
log complete update 2023-04-07 10:13:55 +10:00
132096b427
fix move command 2023-04-07 10:13:54 +10:00
c3df316aa4
add bot 2023-04-07 10:01:57 +10:00
008ddf6298
move main file 2023-04-07 09:16:15 +10:00
145f220a07
mark todo sections 2023-04-04 22:50:13 +10:00
a839460e9a
optimise missing team logging to use a set 2023-04-04 22:44:46 +10:00
8f8b96937d
add missing clears 2023-04-04 22:44:05 +10:00
87114b1d1d
add types to move tracking 2023-04-04 22:44:05 +10:00
e8149e7c3b
reorder match blocks by alphabetical 2023-04-04 22:32:47 +10:00
37a2df3039
refactor parser into class; add newtyping 2023-04-04 22:21:33 +10:00
8a348eca84
mass refactor
- resolve nicknames as we go
- update column names
- better variables
2023-04-04 21:03:41 +10:00
6 changed files with 662 additions and 442 deletions

3
.gitignore vendored
View File

@ -8,5 +8,6 @@ tmp/
build build
htmlcov htmlcov
data.db *.db
cache/ cache/
token

View File

@ -11,8 +11,8 @@ fuck.
## Usage ## Usage
```sh ```sh
$ ./main.py -h $ ./index.py -h
usage: hhirlstats [-h] [-v] [-C] [-o FILE] replay [replay ...] usage: hhirlstats [-h] [-v] [-C] [-t FILE] [-o FILE] replay [replay ...]
extracts stats from a Showdown replay extracts stats from a Showdown replay
@ -23,6 +23,8 @@ options:
-h, --help show this help message and exit -h, --help show this help message and exit
-v, --verbose add debugging info (default: None) -v, --verbose add debugging info (default: None)
-C, --no-cache fetch replays instead of using cache (default: False) -C, --no-cache fetch replays instead of using cache (default: False)
-t FILE, --teams FILE
JSON file defining players to teams (default: teams.json)
-o FILE, --output FILE -o FILE, --output FILE
output data file (default: data.db) output data file (default: data.db)
``` ```

74
bot.py Executable file
View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
from discord.utils import setup_logging
import argparse
import discord
import logging
import re
import shutil
import subprocess as sp
discord.utils.setup_logging()
_log = logging.getLogger("statbot")
_GAMES = "games.txt"
_DB = "holy-heck2.db"
_DB_DEST = f"/var/lib/grafana/{_DB}"
def _write_game(content: str):
try:
with open(_GAMES, "a") as f:
f.write(content)
f.write("\n")
except:
_log.exception(f"failed writing game {content}")
def _update_db():
try:
games = []
with open(_GAMES) as f:
for line in f:
games.append(line.strip())
sp.run(["./index.py", "-o", _DB] + games)
shutil.move(_DB, _DB_DEST)
_log.info("updated db")
except:
_log.exception(f"failed updating db")
class BotClient(discord.Client):
async def on_ready(self):
_log.info(f"ready as {self.user}")
async def on_message(self, message: discord.Message):
content = message.content
if re.match("https://replay.pokemonshowdown.com/dl-.*", content):
_log.info(f"Recognised {content} as a League game")
_write_game(content)
_update_db()
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-t",
"--token-file",
metavar="FILE",
default="token",
help="file containing Discord API token",
)
args = parser.parse_args()
intents = discord.Intents.default()
intents.message_content = True
client = BotClient(intents=intents)
with open(args.token_file) as f:
token = f.read().strip()
client.run(token, log_handler=None)
if __name__ == "__main__":
main()

View File

@ -13,10 +13,18 @@
buildInputs = buildInputs =
let let
python = pkgs.python3.withPackages (ps: [ python = pkgs.python3.withPackages (ps: [
ps.discordpy
ps.mypy
ps.requests ps.requests
ps.types-requests
]); ]);
in in
[ python pkgs.sqlite ]; [
python
pkgs.sqlite
python.pkgs.python-lsp-server
python.pkgs.pylsp-mypy
];
}; };
}); });
} }

573
index.py Executable file
View File

@ -0,0 +1,573 @@
#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
from functools import partial, partialmethod
from pathlib import Path
import argparse
import json
import logging
import re
import requests
import sqlite3
import sys
import typing as t
logging.TRACE = 5 # type: ignore
logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
logging.trace = partial(logging.log, logging.TRACE) # type: ignore
class LogFormatter(logging.Formatter):
_format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, pokemon, move, target,
UNIQUE(game, turn, player, pokemon)
);
CREATE TABLE IF NOT EXISTS switches(
game, turn, player, pokemon,
UNIQUE(game, turn, player, pokemon)
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, pokemon, specie,
UNIQUE(game, player, specie)
);
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, pokemon,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime, winner,
UNIQUE(id)
);
-- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, pokemon, value);
DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, pokemon, value);
"""
)
# Either the value "p1" or "p2"
PlayerTag = t.NewType("PlayerTag", str)
# A player's name
Player = t.NewType("Player", str)
# A player prefixed with a PlayerTag
TaggedPlayer = t.NewType("TaggedPlayer", str)
# A Pokemon identified by its nickname, if any
Pokemon = t.NewType("Pokemon", str)
# A Pokemon specie
PokemonSpecie = t.NewType("PokemonSpecie", str)
# A Pokemon prefixed with a PlayerTag
TaggedPokemon = t.NewType("TaggedPokemon", str)
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
return PlayerTag(tagged[0:1])
TEAMS: dict[Player, Player] = {}
_logged_teams: set[Player] = set()
def team(player: Player) -> Player:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.add(player)
return player
class LogParser:
turn = 0
players: dict[PlayerTag, Player] = {}
hp: dict[TaggedPokemon, int] = {}
# Memorises the user of the move that causes environment setting or status,
# its target, and the move name (for debugging).
last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None
# Memorises the last hazard set against a player and the causing user.
last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {}
# Memorises statuses set on a pokemon and the causing user.
last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {}
def __init__(self, game: str, into: sqlite3.Connection):
self.game = game
self.conn: sqlite3.Connection = into
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
"""Splits a TaggedPokemon into the owning player and the Pokemon."""
[player, pokemon] = user.split(": ")
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
@t.overload
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
"""Resolves the species of a nicknamed Pokemon."""
...
@t.overload
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
"""Resolves the species of a Pokemon given its Showdown identifier (used
in split_pokemon)."""
...
def specie(
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
) -> PokemonSpecie:
if not player:
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
return (
self.conn.execute(
"""
SELECT specie
FROM nicknames
WHERE (game, player, pokemon) = (?, ?, ?)
LIMIT 1
""",
(self.game, team(player), pokemon),
)
.fetchall()[0]
.specie
)
def _reset(self):
self.turn = 0
self.players.clear()
self.hp.clear()
self.last_move = None
self.last_env_set.clear()
self.last_status_set.clear()
def _log_appearance(self, name: TaggedPokemon, specie: str):
# Also includes gender and formes.
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
player, nickname = self.split_pokemon(name)
self.conn.execute(
"""
INSERT INTO nicknames(game, player, pokemon, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, team(player), nickname, trimmed_specie),
)
def parse(self, log: str):
self._reset()
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
LOG.trace(line) # type: ignore
match chunks:
# t.Literal, TaggedPokemon, str, str
case ["drag", name_, specie, status, *rest]:
name = TaggedPokemon(name_)
self.hp[name] = int(status.split("/")[0])
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon
case ["faint", pokemon_]:
pokemon = TaggedPokemon(pokemon_)
player, _ = self.split_pokemon(pokemon)
self.conn.execute(
"""
INSERT INTO knockouts(game, turn, player, pokemon)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), self.specie(pokemon)),
)
# t.Literal, TaggedPokemon, str, TaggedPokemon
case ["move", user_, move, target_]:
user = TaggedPokemon(user_)
target = TaggedPokemon(target_)
last_move = (user, target, move)
player, _ = self.split_pokemon(user)
self.conn.execute(
"""
INSERT INTO moves(game, turn, player, pokemon, move, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
self.turn,
team(player),
self.specie(user),
move,
self.specie(target),
),
)
# t.Literal, PlayerTag, Player
case ["player", id, username, *rest]:
self.players[PlayerTag(id)] = Player(username)
# t.Literal, TaggedPokemon, str
case ["replace", name, specie]:
self._log_appearance(name, specie)
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
case ["switch", name, specie, status, *rest]:
self.hp[name] = int(status.split("/")[0])
# Also includes gender and formes.
trimmed_specie = specie.split(", ")[0]
player, nickname = self.split_pokemon(name)
self._log_appearance(name, specie)
self.conn.execute(
"""
INSERT INTO switches(game, turn, player, pokemon)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(self.game, self.turn, team(player), trimmed_specie),
)
# t.Literal, str
case ["turn", turn]:
self.turn = int(turn)
# t.Literal, Player
case ["win", player]:
self.conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), self.game),
)
case ["-heal", pokemon, status, *rest]:
self.hp[pokemon] = int(status.split("/")[0])
# TODO: track healing done
# t.Literal, TaggedPokemon, str
case ["-damage", pokemon, status]:
# Pokemon takes direct (non-hazard/condition) damage; status
# can be a percentage "70/100" with or without condition, or
# "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != pokemon:
LOG.warning(
f"{pokemon} took direct damage but last move"
f" {last_move[2]} was not targeted at them"
)
continue
damage_source = last_move[0]
source_player, source_nickname = self.split_pokemon(damage_source)
self.conn.execute(
"""
INSERT INTO damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
team(source_player),
self.specie(damage_source),
self.hp[pokemon] - new_hp,
),
)
self.hp[pokemon] = new_hp
# t.Literal, TaggedPokemon, str, str
case ["-damage", pokemon_, status, from_]:
pokemon = TaggedPokemon(pokemon_)
# Pokemon takes indirect damage; status can be a percentage
# "70/100" with or without condition, or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing reason for {line}")
reason = from_.replace("[from] ", "")
source: TaggedPokemon | str | None = None
source_is_pokemon = True
test_hazard = self.last_env_set.get((tag(pokemon), reason))
if test_hazard:
source = test_hazard
LOG.debug(f"identified hazard source {source}")
test_status = self.last_status_set.get((pokemon, reason))
if test_status:
source = test_status
LOG.debug(f"identified move source {source}")
if reason == "Recoil" or reason.startswith("item: "):
LOG.debug(f"identified special source {reason}")
reason = reason.replace("item: ", "")
source = "self"
source_is_pokemon = False
if not source:
LOG.error(f"missing reason for {line}")
continue
player, nickname = self.split_pokemon(pokemon)
if source.startswith("p1") or source.startswith("p2"):
source_player, _ = self.split_pokemon(TaggedPokemon(source))
else:
source_player = None # type: ignore
source_is_pokemon = False
if source_player:
self.conn.execute(
"""
INSERT INTO indirect_damage(game, player, pokemon, value)
VALUES(?, ?, ?, ?)
""",
(
self.game,
team(source_player),
self.specie(TaggedPokemon(source)),
self.hp[pokemon] - new_hp,
),
)
if status == "0 fnt":
self.conn.execute(
"""
INSERT INTO indirect_knockouts(
game, turn, player, pokemon,
reason, source, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
self.game,
self.turn,
team(player),
self.specie(pokemon),
reason,
self.specie(TaggedPokemon(source))
if source_is_pokemon
else source,
team(source_player),
),
)
# t.Literal, TaggedPlayer, str
case ["-sidestart", side_, env]:
side = TaggedPlayer(side_)
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_env_set[
(tag(side), env.replace("move: ", ""))
] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-status", pokemon_, cond]:
pokemon = TaggedPokemon(pokemon_)
if not last_move or last_move[1] != pokemon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
self.last_status_set[(pokemon, cond)] = last_move[0]
# t.Literal, TaggedPokemon, str
case ["-terastallize", pokemon_, type]:
pokemon = TaggedPokemon(pokemon_)
# TODO
pass
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
def fetch(replay: str, cache: bool = True) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data) # type: ignore
def main():
parser = argparse.ArgumentParser(
prog=APP,
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
)
parser.add_argument(
"-t",
"--teams",
action="store",
metavar="FILE",
default="teams.json",
help="JSON file defining players to teams",
)
parser.add_argument(
"-o",
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
)
parser.add_argument("replay", nargs="+", help="replay ID or URL")
args = parser.parse_args()
if args.verbose and args.verbose > 1:
LOG.setLevel(logging.TRACE)
elif args.verbose:
LOG.setLevel(logging.DEBUG)
if args.teams:
with open(args.teams) as f:
global TEAMS
TEAMS = json.load(f)
try:
db = sqlite3.connect(args.output)
_init_db(db)
for r in args.replay:
try:
replay = fetch(r, cache=not args.no_cache)
except Exception as e:
LOG.error(f"bad replay {r}")
continue
LOG.info(f"indexing game {replay.id}")
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
replay.id,
team(replay.p1),
team(replay.p2),
replay.format,
replay.uploadtime,
),
)
LogParser(replay.id, db).parse(replay.log)
db.commit()
finally:
db.close()
if __name__ == "__main__":
main()

438
main.py
View File

@ -1,438 +0,0 @@
#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
from functools import partial, partialmethod
from pathlib import Path
import argparse
import json
import logging
import re
import requests
import sqlite3
import sys
import typing as t
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
TEAMS = {}
_logged_teams = []
def team(player: str) -> str:
"""Maps a username to a defined team."""
if player in TEAMS:
return TEAMS[player]
else:
if not player in _logged_teams and player:
LOG.warning(f"missing team mapping for {player}")
_logged_teams.append(player)
return player
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, name, user, target,
UNIQUE(game, turn, player, user)
);
CREATE TABLE IF NOT EXISTS switches(
game, turn, player, name,
UNIQUE(game, turn, player, name)
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, name, specie,
UNIQUE(game, player, specie)
);
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, name,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, name, source, source_user, source_player,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime, winner,
UNIQUE(id)
);
-- No good way to ensure idempotence for damage; just re-build it.
DROP TABLE IF EXISTS damage;
CREATE TABLE damage(game, player, name, value);
DROP TABLE IF EXISTS indirect_damage;
CREATE TABLE indirect_damage(game, player, name, value);
"""
)
def parse_log(game: str, log: str, into: sqlite3.Connection):
conn = into
turn = 0
players = {}
hp = {}
# ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status,
# and its target
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[str, str], str] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {}
def resolve_mon(user: str) -> tuple[str, str]:
[player, name] = user.split(": ")
return players[player.strip("ab")], name
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
LOG.trace(line)
match chunks:
case ["player", id, username, *rest]:
players[id] = username
case ["turn", turn]:
turn = int(turn)
case ["move", user, move, target]:
last_move = (user, target)
player, user = resolve_mon(user)
_, target = resolve_mon(target)
conn.execute(
"""
INSERT INTO moves(game, turn, player, name, user, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), move, user, target),
)
case ["drag", name, specie, status, *rest]:
hp[name] = int(status.split("/")[0])
case ["switch", name, specie, status, *rest]:
hp[name] = int(status.split("/")[0])
player, name = resolve_mon(name)
conn.execute(
"""
INSERT INTO switches(game, turn, player, name)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), name),
)
conn.execute(
"""
INSERT INTO nicknames(game, player, name, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(player), name, specie.split(", ")[0]),
)
case ["faint", mon]:
player, mon = resolve_mon(mon)
conn.execute(
"""
INSERT INTO knockouts(game, turn, player, name)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, team(player), mon),
)
case ["win", player]:
conn.execute(
"""
UPDATE games
SET winner = ?
WHERE id = ?
""",
(team(player), game),
)
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", mon, status]:
# mon takes direct (non-hazard/condition) damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {hp[mon]}")
LOG.debug(f"source: {last_move}")
# resolve to damage source
if last_move[1] != mon:
LOG.warn(
f"{mon} took direct damage but last move was not targeted at them"
)
continue
user = last_move[0]
source_player, source_mon = resolve_mon(user)
conn.execute(
"""
INSERT INTO damage(game, player, name, value)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, team(source_player), source_mon, hp[mon] - new_hp),
)
hp[mon] = new_hp
case ["-damage", mon, status, from_]:
# mon takes indirect damage
# status can be a percentage 70/100 with or without condition,
# or "0 fnt"
# mon has fainted from an indirect damage source
#
new_hp = int(re.split("[/ ]", status)[0])
LOG.debug(f"{mon} dropped to {new_hp} from {from_}")
LOG.debug(f"tracing source for {line}")
source = from_.replace("[from] ", "")
source_user = None
test_hazard = last_env_set.get((mon[0:1], source))
if test_hazard:
source_user = test_hazard
LOG.debug(f"identified hazard source {source_user}")
test_status = last_status_set.get((mon, source))
if test_status:
source_user = test_status
LOG.debug(f"identified move source {source_user}")
if source == "Recoil" or source.startswith("item: "):
LOG.debug(f"identified special source {source}")
source = source.replace("item: ", "")
source_user = "self"
if not source_user:
LOG.error(f"missing source for {line}")
continue
player, pkmn = resolve_mon(mon)
if source_user.startswith("p1") or source_user.startswith("p2"):
source_player, source_mon = resolve_mon(source_user)
else:
source_player = None
if source_player:
conn.execute(
"""
INSERT INTO indirect_damage(game, player, name, value)
VALUES(?, ?, ?, ?)
""",
(game, team(source_player), source_mon, hp[mon] - new_hp),
)
if status == "0 fnt":
conn.execute(
"""
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
game,
turn,
team(player),
pkmn,
source,
source_mon,
team(source_player),
),
)
case ["-heal", mon, status, *rest]:
hp[mon] = int(status.split("/")[0])
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
def fetch(replay: str, cache: bool = True) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data)
def main(args):
parser = argparse.ArgumentParser(
prog=APP,
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
)
parser.add_argument(
"-t",
"--teams",
action="store",
metavar="FILE",
default="teams.json",
help="JSON file defining players to teams",
)
parser.add_argument(
"-o",
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
)
parser.add_argument("replay", nargs="+", help="replay ID or URL")
args = parser.parse_args(args)
if args.verbose and args.verbose > 1:
LOG.setLevel(logging.TRACE)
elif args.verbose:
LOG.setLevel(logging.DEBUG)
if args.teams:
with open(args.teams) as f:
global TEAMS
TEAMS = json.load(f)
try:
db = sqlite3.connect(args.output)
_init_db(db)
for r in args.replay:
try:
replay = fetch(r, cache=not args.no_cache)
except Exception as e:
LOG.error(f"bad replay {r}")
continue
LOG.info(f"indexing game {replay.id}")
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(
replay.id,
team(replay.p1),
team(replay.p2),
replay.format,
replay.uploadtime,
),
)
parse_log(replay.id, replay.log, into=db)
db.commit()
finally:
db.close()
if __name__ == "__main__":
main(sys.argv[1:])