303 lines
8.8 KiB
Python
Executable File
303 lines
8.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from collections import namedtuple
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import requests
|
|
import sqlite3
|
|
import sys
|
|
import typing as t
|
|
|
|
|
|
APP = "hhirlstats"
|
|
|
|
|
|
def error(*args, **kwargs):
|
|
logging.getLogger(APP).error(*args, **kwargs)
|
|
|
|
|
|
def debug(*args, **kwargs):
|
|
logging.getLogger(APP).debug(*args, **kwargs)
|
|
|
|
|
|
class safelist(list):
|
|
def get(self, index, default=None):
|
|
try:
|
|
return self.__getitem__(index)
|
|
except IndexError:
|
|
return default
|
|
|
|
|
|
def _init_db(conn: sqlite3.Connection):
|
|
def namedtuple_factory(cursor, row):
|
|
fields = [column[0] for column in cursor.description]
|
|
cls = namedtuple("Row", fields)
|
|
return cls._make(row)
|
|
|
|
conn.row_factory = namedtuple_factory
|
|
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS moves(
|
|
game, turn, player, name, user, target,
|
|
UNIQUE(game, turn, player, user)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS switches(
|
|
game, turn, name,
|
|
UNIQUE(game, turn, name)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS nicknames(
|
|
game, player, name, specie,
|
|
UNIQUE(game,player, specie)
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
"""
|
|
)
|
|
|
|
|
|
def parse_log(game: str, log: str, into: sqlite3.Connection):
|
|
conn = into
|
|
|
|
turn = 0
|
|
players = {}
|
|
|
|
def resolve_mon(user: str) -> tuple[str, str]:
|
|
[player, name] = user.split(": ")
|
|
return players[player.strip("ab")], name
|
|
|
|
for line in log.split("\n"):
|
|
chunks = line.split("|")[1:]
|
|
if not chunks:
|
|
continue
|
|
|
|
match chunks:
|
|
case ["player", id, username, *rest]:
|
|
players[id] = username
|
|
case ["turn", turn]:
|
|
turn = int(turn)
|
|
case ["move", user, move, target]:
|
|
player, user = resolve_mon(user)
|
|
_, target = resolve_mon(target)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO moves(game, turn, player, name, user, target)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
(game, turn, player, move, user, target),
|
|
)
|
|
case ["switch", name, specie, *rest]:
|
|
player, name = resolve_mon(name)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO switches(game, turn, name)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
(game, turn, name),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO nicknames(game, player, name, specie)
|
|
VALUES(?, ?, ?, ?)
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
(game, player, name, specie.split(", ")[0]),
|
|
)
|
|
case _:
|
|
debug(f"unhandled message {chunks[0]}")
|
|
|
|
|
|
QUERIES = ["gametime", "moves", "nicknames", "playtime", "usage"]
|
|
|
|
|
|
def query(type: str, conn: sqlite3.Connection):
|
|
match type:
|
|
case "gametime":
|
|
print("Longest games")
|
|
print("=============")
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT game, MAX(turn) AS n
|
|
FROM moves
|
|
GROUP BY game
|
|
ORDER BY n DESC
|
|
LIMIT 5
|
|
"""
|
|
):
|
|
replay = fetch(row.game, cache=True)
|
|
print(f"{replay.p1} vs {replay.p2}: {row.n} turns")
|
|
|
|
case "moves":
|
|
print("Move usage overall")
|
|
print("==================")
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT name, COUNT(*) AS n
|
|
FROM moves
|
|
GROUP BY name
|
|
ORDER BY n DESC, name
|
|
LIMIT 10
|
|
"""
|
|
):
|
|
print(f"{row.name}: {row.n}")
|
|
|
|
case "nicknames":
|
|
print("Nickname usage per player")
|
|
print("=========================")
|
|
for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"):
|
|
print(row_p.player)
|
|
for row_s in conn.execute(
|
|
"SELECT DISTINCT specie FROM nicknames WHERE player = ?",
|
|
(row_p.player,),
|
|
):
|
|
print(f" {row_s.specie}: ", end="")
|
|
names = []
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT player, specie, name, count(game) AS n
|
|
FROM nicknames
|
|
WHERE player = ? AND specie = ?
|
|
GROUP BY player, specie, name
|
|
ORDER BY player, specie, name
|
|
""",
|
|
(row_p.player, row_s.specie),
|
|
):
|
|
names.append(f"{row.name} (x{row.n})")
|
|
print(*names, sep=", ")
|
|
|
|
case "playtime":
|
|
print("Active playtime per Pokemon")
|
|
print("===========================")
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT m.player, k.specie, COUNT(m.name) AS n
|
|
FROM moves m
|
|
LEFT JOIN nicknames k ON (m.game, m.player, m.user) = (k.game, k.player, k.name)
|
|
GROUP BY k.specie, m.player
|
|
ORDER BY n DESC, k.specie, m.player
|
|
LIMIT 10
|
|
"""
|
|
):
|
|
print(f"{row.specie} ({row.player}): {row.n} turns")
|
|
|
|
case "usage":
|
|
print("Pokemon usage per player")
|
|
print("========================")
|
|
games = {
|
|
r.player: r.n
|
|
for r in conn.execute(
|
|
"""
|
|
SELECT player, COUNT(m.game) AS n
|
|
FROM (SELECT DISTINCT player, game FROM moves) m
|
|
GROUP BY player
|
|
"""
|
|
)
|
|
}
|
|
for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"):
|
|
print(row_p.player)
|
|
for row_s in conn.execute(
|
|
"""
|
|
SELECT specie, COUNT(game) AS n
|
|
FROM nicknames
|
|
WHERE player = ?
|
|
GROUP BY specie
|
|
ORDER BY n DESC, specie
|
|
""",
|
|
(row_p.player,),
|
|
):
|
|
print(
|
|
f" {row_s.specie}: {row_s.n}"
|
|
f" ({row_s.n / games[row_p.player] * 100:.2f}%)"
|
|
)
|
|
case _:
|
|
error(f"unknown query {type}")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Replay:
|
|
id: str
|
|
p1: str
|
|
p2: str
|
|
format: str
|
|
log: str
|
|
uploadtime: int
|
|
views: int
|
|
p1id: str
|
|
p2id: str
|
|
formatid: str
|
|
rating: int
|
|
private: int
|
|
password: t.Optional[str]
|
|
|
|
|
|
def fetch(replay: str, cache: bool = False) -> Replay:
|
|
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
|
|
replay_file = Path.cwd() / "cache" / f"{replay}.json"
|
|
|
|
if cache and replay_file.exists():
|
|
with replay_file.open() as f:
|
|
return Replay(**json.load(f))
|
|
|
|
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
|
|
if data.status_code != 200:
|
|
raise Exception(data.text)
|
|
data = data.json()
|
|
|
|
if cache:
|
|
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
with replay_file.open(mode="w") as f:
|
|
json.dump(data, f)
|
|
|
|
return Replay(**data)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog=APP, description="extracts stats from a Showdown replay"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose", action="store_true", help="add debugging info"
|
|
)
|
|
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
|
|
parser.add_argument(
|
|
"-Q",
|
|
"--query",
|
|
choices=QUERIES,
|
|
help="run query instead of download",
|
|
)
|
|
parser.add_argument("replay", nargs="*", help="replay ID or URL")
|
|
|
|
args = parser.parse_args(sys.argv[1:])
|
|
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
|
|
|
|
try:
|
|
db = sqlite3.connect("data.db")
|
|
_init_db(db)
|
|
|
|
if args.query:
|
|
query(args.query, db)
|
|
else:
|
|
for r in args.replay:
|
|
try:
|
|
replay = fetch(r, cache=args.cache)
|
|
except Exception as e:
|
|
error(f"bad replay {r}")
|
|
continue
|
|
|
|
parse_log(replay.id, replay.log, into=db)
|
|
db.commit()
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|