Compare commits
8 Commits
5f9ae3e145
...
1e04b57a50
Author | SHA1 | Date | |
---|---|---|---|
1e04b57a50 | |||
872abf2e6c | |||
89508608f0 | |||
ff15dec1cc | |||
c8f5803c83 | |||
a9f52351e7 | |||
b162f63ab2 | |||
693029fd21 |
20
README.md
20
README.md
@ -12,12 +12,26 @@ fuck.
|
|||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ ./main.py -h
|
$ ./main.py -h
|
||||||
usage: hhirlstats [-h] [-v] [-c] [-Q {gametime,moves,nicknames,playtime,usage}] [replay ...]
|
usage: hhirlstats [-h] [-v] [-C] [-o FILE] replay [replay ...]
|
||||||
|
|
||||||
|
extracts stats from a Showdown replay
|
||||||
|
|
||||||
|
positional arguments:
|
||||||
|
replay replay ID or URL
|
||||||
|
|
||||||
|
options:
|
||||||
|
-h, --help show this help message and exit
|
||||||
|
-v, --verbose add debugging info (default: None)
|
||||||
|
-C, --no-cache fetch replays instead of using cache (default: False)
|
||||||
|
-o FILE, --output FILE
|
||||||
|
output data file (default: data.db)
|
||||||
```
|
```
|
||||||
|
|
||||||
Recommended to cache replays (`-c`) to save issuing requests to Showdown on every query.
|
Replay files are cached after they're downloaded for the first time save issuing
|
||||||
|
requests to Showdown on every new run.
|
||||||
|
|
||||||
Run the program once with all your replays (or once for each replay), then start querying.
|
Run the program once with all your replays (or once for each replay), then run
|
||||||
|
whatever SQL queries against the data file (default `data.db`) you want.
|
||||||
|
|
||||||
## Future work
|
## Future work
|
||||||
|
|
||||||
|
144
main.py
144
main.py
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from functools import partial, partialmethod
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
@ -12,15 +13,35 @@ import sys
|
|||||||
import typing as t
|
import typing as t
|
||||||
|
|
||||||
|
|
||||||
|
logging.TRACE = 5
|
||||||
|
logging.addLevelName(logging.TRACE, "TRACE")
|
||||||
|
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
|
||||||
|
logging.trace = partial(logging.log, logging.TRACE)
|
||||||
|
|
||||||
|
|
||||||
|
class LogFormatter(logging.Formatter):
|
||||||
|
|
||||||
|
format = "%(name)s [%(levelname)s] %(message)s"
|
||||||
|
FORMATS = {
|
||||||
|
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
|
||||||
|
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
|
||||||
|
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
|
||||||
|
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
|
||||||
|
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
|
||||||
|
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
|
||||||
|
}
|
||||||
|
|
||||||
|
def format(self, record):
|
||||||
|
fmt = self.FORMATS.get(record.levelno)
|
||||||
|
formatter = logging.Formatter(fmt)
|
||||||
|
return formatter.format(record)
|
||||||
|
|
||||||
|
|
||||||
APP = "hhirlstats"
|
APP = "hhirlstats"
|
||||||
|
LOG = logging.getLogger(APP)
|
||||||
|
_ch = logging.StreamHandler()
|
||||||
def error(*args, **kwargs):
|
_ch.setFormatter(LogFormatter())
|
||||||
logging.getLogger(APP).error(*args, **kwargs)
|
LOG.addHandler(_ch)
|
||||||
|
|
||||||
|
|
||||||
def debug(*args, **kwargs):
|
|
||||||
logging.getLogger(APP).debug(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class safelist(list):
|
class safelist(list):
|
||||||
@ -57,6 +78,10 @@ def _init_db(conn: sqlite3.Connection):
|
|||||||
game, turn, player, name,
|
game, turn, player, name,
|
||||||
UNIQUE(game, turn, player)
|
UNIQUE(game, turn, player)
|
||||||
);
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS indirect_knockouts(
|
||||||
|
game, turn, player, name, source, source_user, source_player,
|
||||||
|
UNIQUE(game, turn, player)
|
||||||
|
);
|
||||||
CREATE TABLE IF NOT EXISTS games(
|
CREATE TABLE IF NOT EXISTS games(
|
||||||
id, p1, p2, format, uploadtime,
|
id, p1, p2, format, uploadtime,
|
||||||
UNIQUE(id)
|
UNIQUE(id)
|
||||||
@ -71,6 +96,17 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
|
|||||||
turn = 0
|
turn = 0
|
||||||
players = {}
|
players = {}
|
||||||
|
|
||||||
|
# ("p2a: Edward", "p1a: Meteo")
|
||||||
|
# memorises the user of the move that causes environment setting or status,
|
||||||
|
# and its target
|
||||||
|
last_move: t.Optional[tuple[str, str]]
|
||||||
|
|
||||||
|
# ("p1", "Spikes") => "p2a: Frosslas"
|
||||||
|
last_env_set: dict[tuple[str, str], str] = {}
|
||||||
|
|
||||||
|
# ("p1a: Meteo", "brn") => "p2a: Edward"
|
||||||
|
last_status_set: dict[tuple[str, str], str] = {}
|
||||||
|
|
||||||
def resolve_mon(user: str) -> tuple[str, str]:
|
def resolve_mon(user: str) -> tuple[str, str]:
|
||||||
[player, name] = user.split(": ")
|
[player, name] = user.split(": ")
|
||||||
return players[player.strip("ab")], name
|
return players[player.strip("ab")], name
|
||||||
@ -80,12 +116,15 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
|
|||||||
if not chunks:
|
if not chunks:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
LOG.trace(line)
|
||||||
|
|
||||||
match chunks:
|
match chunks:
|
||||||
case ["player", id, username, *rest]:
|
case ["player", id, username, *rest]:
|
||||||
players[id] = username
|
players[id] = username
|
||||||
case ["turn", turn]:
|
case ["turn", turn]:
|
||||||
turn = int(turn)
|
turn = int(turn)
|
||||||
case ["move", user, move, target]:
|
case ["move", user, move, target]:
|
||||||
|
last_move = (user, target)
|
||||||
player, user = resolve_mon(user)
|
player, user = resolve_mon(user)
|
||||||
_, target = resolve_mon(target)
|
_, target = resolve_mon(target)
|
||||||
conn.execute(
|
conn.execute(
|
||||||
@ -124,8 +163,62 @@ def parse_log(game: str, log: str, into: sqlite3.Connection):
|
|||||||
""",
|
""",
|
||||||
(game, turn, player, mon),
|
(game, turn, player, mon),
|
||||||
)
|
)
|
||||||
|
case ["-sidestart", side, env]:
|
||||||
|
if not last_move:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
|
||||||
|
case ["-status", mon, cond]:
|
||||||
|
if not last_move or last_move[1] != mon:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
last_status_set[(mon, cond)] = last_move[0]
|
||||||
|
case ["-damage", mon, *rest]:
|
||||||
|
# rest is new_hp and sometimes a source (if not from a move)
|
||||||
|
# in a knockout, new_hp is "0 fnt"
|
||||||
|
if rest[0] == "0 fnt" and len(rest) > 1:
|
||||||
|
LOG.debug(f"tracing source for {line}")
|
||||||
|
source = rest[1].replace("[from] ", "")
|
||||||
|
source_user = None
|
||||||
|
|
||||||
|
test_hazard = last_env_set.get((mon[0:1], source))
|
||||||
|
if test_hazard:
|
||||||
|
source_user = test_hazard
|
||||||
|
LOG.debug(f"identified hazard source {source_user}")
|
||||||
|
|
||||||
|
test_status = last_status_set.get((mon, source))
|
||||||
|
if test_status:
|
||||||
|
source_user = test_status
|
||||||
|
LOG.debug(f"identified move source {source_user}")
|
||||||
|
|
||||||
|
if source == "Recoil" or source.startswith("item: "):
|
||||||
|
LOG.debug(f"identified special source {source}")
|
||||||
|
source = source.replace("item: ", "")
|
||||||
|
source_user = "self"
|
||||||
|
|
||||||
|
if not source_user:
|
||||||
|
LOG.error(f"missing source for {line}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
player, mon = resolve_mon(mon)
|
||||||
|
if source_user.startswith("p1") or source_user.startswith("p2"):
|
||||||
|
source_player, source_user = resolve_mon(source_user)
|
||||||
|
else:
|
||||||
|
source_player = None
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
|
||||||
|
VALUES(?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(game, turn, player, mon, source, source_user, source_player),
|
||||||
|
)
|
||||||
case _:
|
case _:
|
||||||
debug(f"unhandled message {chunks[0]}")
|
# LOG.debug(f"unhandled message {chunks[0]}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@ -145,7 +238,7 @@ class Replay:
|
|||||||
password: t.Optional[str]
|
password: t.Optional[str]
|
||||||
|
|
||||||
|
|
||||||
def fetch(replay: str, cache: bool = False) -> Replay:
|
def fetch(replay: str, cache: bool = True) -> Replay:
|
||||||
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
|
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
|
||||||
replay_file = Path.cwd() / "cache" / f"{replay}.json"
|
replay_file = Path.cwd() / "cache" / f"{replay}.json"
|
||||||
|
|
||||||
@ -168,28 +261,45 @@ def fetch(replay: str, cache: bool = False) -> Replay:
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog=APP, description="extracts stats from a Showdown replay"
|
prog=APP,
|
||||||
|
description="extracts stats from a Showdown replay",
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
|
||||||
|
parser.add_argument(
|
||||||
|
"-C",
|
||||||
|
"--no-cache",
|
||||||
|
action="store_true",
|
||||||
|
help="fetch replays instead of using cache",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-v", "--verbose", action="store_true", help="add debugging info"
|
"-o",
|
||||||
|
"--output",
|
||||||
|
action="store",
|
||||||
|
metavar="FILE",
|
||||||
|
default="data.db",
|
||||||
|
help="output data file",
|
||||||
)
|
)
|
||||||
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
|
|
||||||
parser.add_argument("replay", nargs="+", help="replay ID or URL")
|
parser.add_argument("replay", nargs="+", help="replay ID or URL")
|
||||||
|
|
||||||
args = parser.parse_args(sys.argv[1:])
|
args = parser.parse_args(sys.argv[1:])
|
||||||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
|
if args.verbose and args.verbose > 1:
|
||||||
|
LOG.setLevel(logging.TRACE)
|
||||||
|
elif args.verbose:
|
||||||
|
LOG.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
db = sqlite3.connect("data.db")
|
db = sqlite3.connect(args.output)
|
||||||
_init_db(db)
|
_init_db(db)
|
||||||
|
|
||||||
for r in args.replay:
|
for r in args.replay:
|
||||||
try:
|
try:
|
||||||
replay = fetch(r, cache=args.cache)
|
replay = fetch(r, cache=not args.no_cache)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error(f"bad replay {r}")
|
LOG.error(f"bad replay {r}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
LOG.info(f"indexing game {replay.id}")
|
||||||
db.execute(
|
db.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO games(id, p1, p2, format, uploadtime)
|
INSERT INTO games(id, p1, p2, format, uploadtime)
|
||||||
|
Loading…
Reference in New Issue
Block a user