Compare commits

..

No commits in common. "1e04b57a508473d212adfebb58ca7569a7204a42" and "5f9ae3e145a22235df8f71be169b134266a33cb0" have entirely different histories.

2 changed files with 20 additions and 144 deletions

View File

@ -12,26 +12,12 @@ fuck.
```sh ```sh
$ ./main.py -h $ ./main.py -h
usage: hhirlstats [-h] [-v] [-C] [-o FILE] replay [replay ...] usage: hhirlstats [-h] [-v] [-c] [-Q {gametime,moves,nicknames,playtime,usage}] [replay ...]
extracts stats from a Showdown replay
positional arguments:
replay replay ID or URL
options:
-h, --help show this help message and exit
-v, --verbose add debugging info (default: None)
-C, --no-cache fetch replays instead of using cache (default: False)
-o FILE, --output FILE
output data file (default: data.db)
``` ```
Replay files are cached after they're downloaded for the first time save issuing Recommended to cache replays (`-c`) to save issuing requests to Showdown on every query.
requests to Showdown on every new run.
Run the program once with all your replays (or once for each replay), then run Run the program once with all your replays (or once for each replay), then start querying.
whatever SQL queries against the data file (default `data.db`) you want.
## Future work ## Future work

144
main.py
View File

@ -2,7 +2,6 @@
from collections import namedtuple from collections import namedtuple
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial, partialmethod
from pathlib import Path from pathlib import Path
import argparse import argparse
import json import json
@ -13,35 +12,15 @@ import sys
import typing as t import typing as t
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
APP = "hhirlstats" APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter()) def error(*args, **kwargs):
LOG.addHandler(_ch) logging.getLogger(APP).error(*args, **kwargs)
def debug(*args, **kwargs):
logging.getLogger(APP).debug(*args, **kwargs)
class safelist(list): class safelist(list):
@ -78,10 +57,6 @@ def _init_db(conn: sqlite3.Connection):
game, turn, player, name, game, turn, player, name,
UNIQUE(game, turn, player) UNIQUE(game, turn, player)
); );
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, name, source, source_user, source_player,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games( CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime, id, p1, p2, format, uploadtime,
UNIQUE(id) UNIQUE(id)
@ -96,17 +71,6 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
turn = 0 turn = 0
players = {} players = {}
# ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status,
# and its target
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[str, str], str] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {}
def resolve_mon(user: str) -> tuple[str, str]: def resolve_mon(user: str) -> tuple[str, str]:
[player, name] = user.split(": ") [player, name] = user.split(": ")
return players[player.strip("ab")], name return players[player.strip("ab")], name
@ -116,15 +80,12 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
if not chunks: if not chunks:
continue continue
LOG.trace(line)
match chunks: match chunks:
case ["player", id, username, *rest]: case ["player", id, username, *rest]:
players[id] = username players[id] = username
case ["turn", turn]: case ["turn", turn]:
turn = int(turn) turn = int(turn)
case ["move", user, move, target]: case ["move", user, move, target]:
last_move = (user, target)
player, user = resolve_mon(user) player, user = resolve_mon(user)
_, target = resolve_mon(target) _, target = resolve_mon(target)
conn.execute( conn.execute(
@ -163,62 +124,8 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
""", """,
(game, turn, player, mon), (game, turn, player, mon),
) )
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", mon, *rest]:
# rest is new_hp and sometimes a source (if not from a move)
# in a knockout, new_hp is "0 fnt"
if rest[0] == "0 fnt" and len(rest) > 1:
LOG.debug(f"tracing source for {line}")
source = rest[1].replace("[from] ", "")
source_user = None
test_hazard = last_env_set.get((mon[0:1], source))
if test_hazard:
source_user = test_hazard
LOG.debug(f"identified hazard source {source_user}")
test_status = last_status_set.get((mon, source))
if test_status:
source_user = test_status
LOG.debug(f"identified move source {source_user}")
if source == "Recoil" or source.startswith("item: "):
LOG.debug(f"identified special source {source}")
source = source.replace("item: ", "")
source_user = "self"
if not source_user:
LOG.error(f"missing source for {line}")
continue
player, mon = resolve_mon(mon)
if source_user.startswith("p1") or source_user.startswith("p2"):
source_player, source_user = resolve_mon(source_user)
else:
source_player = None
conn.execute(
"""
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, mon, source, source_user, source_player),
)
case _: case _:
# LOG.debug(f"unhandled message {chunks[0]}") debug(f"unhandled message {chunks[0]}")
pass
@dataclass(frozen=True) @dataclass(frozen=True)
@ -238,7 +145,7 @@ class Replay:
password: t.Optional[str] password: t.Optional[str]
def fetch(replay: str, cache: bool = True) -> Replay: def fetch(replay: str, cache: bool = False) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json" replay_file = Path.cwd() / "cache" / f"{replay}.json"
@ -261,45 +168,28 @@ def fetch(replay: str, cache: bool = True) -> Replay:
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog=APP, prog=APP, description="extracts stats from a Showdown replay"
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
) )
parser.add_argument( parser.add_argument(
"-o", "-v", "--verbose", action="store_true", help="add debugging info"
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
) )
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
parser.add_argument("replay", nargs="+", help="replay ID or URL") parser.add_argument("replay", nargs="+", help="replay ID or URL")
args = parser.parse_args(sys.argv[1:]) args = parser.parse_args(sys.argv[1:])
if args.verbose and args.verbose > 1: logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
LOG.setLevel(logging.TRACE)
elif args.verbose:
LOG.setLevel(logging.DEBUG)
try: try:
db = sqlite3.connect(args.output) db = sqlite3.connect("data.db")
_init_db(db) _init_db(db)
for r in args.replay: for r in args.replay:
try: try:
replay = fetch(r, cache=not args.no_cache) replay = fetch(r, cache=args.cache)
except Exception as e: except Exception as e:
LOG.error(f"bad replay {r}") error(f"bad replay {r}")
continue continue
LOG.info(f"indexing game {replay.id}")
db.execute( db.execute(
""" """
INSERT INTO games(id, p1, p2, format, uploadtime) INSERT INTO games(id, p1, p2, format, uploadtime)