Compare commits

..

No commits in common. "1e04b57a508473d212adfebb58ca7569a7204a42" and "5f9ae3e145a22235df8f71be169b134266a33cb0" have entirely different histories.

2 changed files with 20 additions and 144 deletions

View File

@ -12,26 +12,12 @@ fuck.
```sh
$ ./main.py -h
usage: hhirlstats [-h] [-v] [-C] [-o FILE] replay [replay ...]
extracts stats from a Showdown replay
positional arguments:
replay replay ID or URL
options:
-h, --help show this help message and exit
-v, --verbose add debugging info (default: None)
-C, --no-cache fetch replays instead of using cache (default: False)
-o FILE, --output FILE
output data file (default: data.db)
usage: hhirlstats [-h] [-v] [-c] [-Q {gametime,moves,nicknames,playtime,usage}] [replay ...]
```
Replay files are cached after they're downloaded for the first time save issuing
requests to Showdown on every new run.
Recommended to cache replays (`-c`) to save issuing requests to Showdown on every query.
Run the program once with all your replays (or once for each replay), then run
whatever SQL queries against the data file (default `data.db`) you want.
Run the program once with all your replays (or once for each replay), then start querying.
## Future work

144
main.py
View File

@ -2,7 +2,6 @@
from collections import namedtuple
from dataclasses import dataclass
from functools import partial, partialmethod
from pathlib import Path
import argparse
import json
@ -13,35 +12,15 @@ import sys
import typing as t
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
class LogFormatter(logging.Formatter):
format = "%(name)s [%(levelname)s] %(message)s"
FORMATS = {
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(fmt)
return formatter.format(record)
APP = "hhirlstats"
LOG = logging.getLogger(APP)
_ch = logging.StreamHandler()
_ch.setFormatter(LogFormatter())
LOG.addHandler(_ch)
def error(*args, **kwargs):
logging.getLogger(APP).error(*args, **kwargs)
def debug(*args, **kwargs):
logging.getLogger(APP).debug(*args, **kwargs)
class safelist(list):
@ -78,10 +57,6 @@ def _init_db(conn: sqlite3.Connection):
game, turn, player, name,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS indirect_knockouts(
game, turn, player, name, source, source_user, source_player,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime,
UNIQUE(id)
@ -96,17 +71,6 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
turn = 0
players = {}
# ("p2a: Edward", "p1a: Meteo")
# memorises the user of the move that causes environment setting or status,
# and its target
last_move: t.Optional[tuple[str, str]]
# ("p1", "Spikes") => "p2a: Frosslas"
last_env_set: dict[tuple[str, str], str] = {}
# ("p1a: Meteo", "brn") => "p2a: Edward"
last_status_set: dict[tuple[str, str], str] = {}
def resolve_mon(user: str) -> tuple[str, str]:
[player, name] = user.split(": ")
return players[player.strip("ab")], name
@ -116,15 +80,12 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
if not chunks:
continue
LOG.trace(line)
match chunks:
case ["player", id, username, *rest]:
players[id] = username
case ["turn", turn]:
turn = int(turn)
case ["move", user, move, target]:
last_move = (user, target)
player, user = resolve_mon(user)
_, target = resolve_mon(target)
conn.execute(
@ -163,62 +124,8 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
""",
(game, turn, player, mon),
)
case ["-sidestart", side, env]:
if not last_move:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
case ["-status", mon, cond]:
if not last_move or last_move[1] != mon:
LOG.warning(f"missing previous move for {line}")
continue
LOG.debug(f"{line} <- {last_move}")
last_status_set[(mon, cond)] = last_move[0]
case ["-damage", mon, *rest]:
# rest is new_hp and sometimes a source (if not from a move)
# in a knockout, new_hp is "0 fnt"
if rest[0] == "0 fnt" and len(rest) > 1:
LOG.debug(f"tracing source for {line}")
source = rest[1].replace("[from] ", "")
source_user = None
test_hazard = last_env_set.get((mon[0:1], source))
if test_hazard:
source_user = test_hazard
LOG.debug(f"identified hazard source {source_user}")
test_status = last_status_set.get((mon, source))
if test_status:
source_user = test_status
LOG.debug(f"identified move source {source_user}")
if source == "Recoil" or source.startswith("item: "):
LOG.debug(f"identified special source {source}")
source = source.replace("item: ", "")
source_user = "self"
if not source_user:
LOG.error(f"missing source for {line}")
continue
player, mon = resolve_mon(mon)
if source_user.startswith("p1") or source_user.startswith("p2"):
source_player, source_user = resolve_mon(source_user)
else:
source_player = None
conn.execute(
"""
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, mon, source, source_user, source_player),
)
case _:
# LOG.debug(f"unhandled message {chunks[0]}")
pass
debug(f"unhandled message {chunks[0]}")
@dataclass(frozen=True)
@ -238,7 +145,7 @@ class Replay:
password: t.Optional[str]
def fetch(replay: str, cache: bool = True) -> Replay:
def fetch(replay: str, cache: bool = False) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
@ -261,45 +168,28 @@ def fetch(replay: str, cache: bool = True) -> Replay:
def main():
parser = argparse.ArgumentParser(
prog=APP,
description="extracts stats from a Showdown replay",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
parser.add_argument(
"-C",
"--no-cache",
action="store_true",
help="fetch replays instead of using cache",
prog=APP, description="extracts stats from a Showdown replay"
)
parser.add_argument(
"-o",
"--output",
action="store",
metavar="FILE",
default="data.db",
help="output data file",
"-v", "--verbose", action="store_true", help="add debugging info"
)
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
parser.add_argument("replay", nargs="+", help="replay ID or URL")
args = parser.parse_args(sys.argv[1:])
if args.verbose and args.verbose > 1:
LOG.setLevel(logging.TRACE)
elif args.verbose:
LOG.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
try:
db = sqlite3.connect(args.output)
db = sqlite3.connect("data.db")
_init_db(db)
for r in args.replay:
try:
replay = fetch(r, cache=not args.no_cache)
replay = fetch(r, cache=args.cache)
except Exception as e:
LOG.error(f"bad replay {r}")
error(f"bad replay {r}")
continue
LOG.info(f"indexing game {replay.id}")
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)