holy-heck-i-really-like-stats/main.py

321 lines
9.6 KiB
Python
Raw Normal View History

2023-03-26 18:47:11 +11:00
#!/usr/bin/env python3
from collections import namedtuple
from dataclasses import dataclass
from pathlib import Path
import argparse
import json
import logging
import requests
import sqlite3
import sys
import typing as t
APP = "hhirlstats"
def error(*args, **kwargs):
logging.getLogger(APP).error(*args, **kwargs)
def debug(*args, **kwargs):
logging.getLogger(APP).debug(*args, **kwargs)
class safelist(list):
def get(self, index, default=None):
try:
return self.__getitem__(index)
except IndexError:
return default
def _init_db(conn: sqlite3.Connection):
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
conn.row_factory = namedtuple_factory
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS moves(
game, turn, player, name, user, target,
UNIQUE(game, turn, player, user)
);
CREATE TABLE IF NOT EXISTS switches(
2023-03-27 20:19:58 +11:00
game, turn, player, name,
UNIQUE(game, turn, player, name)
2023-03-26 18:47:11 +11:00
);
CREATE TABLE IF NOT EXISTS nicknames(
game, player, name, specie,
2023-03-27 20:17:18 +11:00
UNIQUE(game, player, specie)
2023-03-26 18:47:11 +11:00
);
2023-03-27 20:17:46 +11:00
CREATE TABLE IF NOT EXISTS knockouts(
game, turn, player, name,
UNIQUE(game, turn, player)
);
CREATE TABLE IF NOT EXISTS games(
id, p1, p2, format, uploadtime,
UNIQUE(id)
)
2023-03-26 18:47:11 +11:00
"""
)
def parse_log(game: str, log: str, into: sqlite3.Connection):
conn = into
turn = 0
players = {}
def resolve_mon(user: str) -> tuple[str, str]:
[player, name] = user.split(": ")
return players[player.strip("ab")], name
for line in log.split("\n"):
chunks = line.split("|")[1:]
if not chunks:
continue
match chunks:
case ["player", id, username, *rest]:
players[id] = username
case ["turn", turn]:
turn = int(turn)
case ["move", user, move, target]:
player, user = resolve_mon(user)
_, target = resolve_mon(target)
conn.execute(
"""
INSERT INTO moves(game, turn, player, name, user, target)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, turn, player, move, user, target),
)
case ["switch", name, specie, *rest]:
player, name = resolve_mon(name)
conn.execute(
"""
2023-03-27 20:19:58 +11:00
INSERT INTO switches(game, turn, player, name)
VALUES (?, ?, ?, ?)
2023-03-26 18:47:11 +11:00
ON CONFLICT DO NOTHING
""",
2023-03-27 20:19:58 +11:00
(game, turn, player, name),
2023-03-26 18:47:11 +11:00
)
conn.execute(
"""
INSERT INTO nicknames(game, player, name, specie)
VALUES(?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(game, player, name, specie.split(", ")[0]),
)
case _:
debug(f"unhandled message {chunks[0]}")
QUERIES = ["gametime", "moves", "nicknames", "playtime", "usage"]
def query(type: str, conn: sqlite3.Connection):
match type:
case "gametime":
print("Longest games")
print("=============")
for row in conn.execute(
"""
SELECT game, MAX(turn) AS n
FROM moves
GROUP BY game
ORDER BY n DESC
LIMIT 5
"""
):
replay = fetch(row.game, cache=True)
print(f"{replay.p1} vs {replay.p2}: {row.n} turns")
case "moves":
print("Move usage overall")
print("==================")
for row in conn.execute(
"""
SELECT name, COUNT(*) AS n
FROM moves
GROUP BY name
ORDER BY n DESC, name
LIMIT 10
"""
):
print(f"{row.name}: {row.n}")
case "nicknames":
print("Nickname usage per player")
print("=========================")
for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"):
print(row_p.player)
for row_s in conn.execute(
"SELECT DISTINCT specie FROM nicknames WHERE player = ?",
(row_p.player,),
):
print(f" {row_s.specie}: ", end="")
names = []
for row in conn.execute(
"""
SELECT player, specie, name, count(game) AS n
FROM nicknames
WHERE player = ? AND specie = ?
GROUP BY player, specie, name
ORDER BY player, specie, name
""",
(row_p.player, row_s.specie),
):
names.append(f"{row.name} (x{row.n})")
print(*names, sep=", ")
case "playtime":
print("Active playtime per Pokemon")
print("===========================")
for row in conn.execute(
"""
SELECT m.player, k.specie, COUNT(m.name) AS n
FROM moves m
LEFT JOIN nicknames k ON (m.game, m.player, m.user) = (k.game, k.player, k.name)
GROUP BY k.specie, m.player
ORDER BY n DESC, k.specie, m.player
LIMIT 10
"""
):
print(f"{row.specie} ({row.player}): {row.n} turns")
case "usage":
print("Pokemon usage per player")
print("========================")
games = {
r.player: r.n
for r in conn.execute(
"""
SELECT player, COUNT(m.game) AS n
FROM (SELECT DISTINCT player, game FROM moves) m
GROUP BY player
"""
)
}
for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"):
print(row_p.player)
for row_s in conn.execute(
"""
SELECT specie, COUNT(game) AS n
FROM nicknames
WHERE player = ?
GROUP BY specie
ORDER BY n DESC, specie
""",
(row_p.player,),
):
print(
f" {row_s.specie}: {row_s.n}"
f" ({row_s.n / games[row_p.player] * 100:.2f}%)"
)
case _:
error(f"unknown query {type}")
@dataclass(frozen=True)
class Replay:
id: str
p1: str
p2: str
format: str
log: str
uploadtime: int
views: int
p1id: str
p2id: str
formatid: str
rating: int
private: int
password: t.Optional[str]
def fetch(replay: str, cache: bool = False) -> Replay:
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
replay_file = Path.cwd() / "cache" / f"{replay}.json"
if cache and replay_file.exists():
with replay_file.open() as f:
return Replay(**json.load(f))
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
if data.status_code != 200:
raise Exception(data.text)
data = data.json()
if cache:
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with replay_file.open(mode="w") as f:
json.dump(data, f)
return Replay(**data)
def main():
parser = argparse.ArgumentParser(
prog=APP, description="extracts stats from a Showdown replay"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="add debugging info"
)
parser.add_argument("-c", "--cache", action="store_true", help="cache replays")
parser.add_argument(
"-Q",
"--query",
choices=QUERIES,
help="run query instead of download",
)
parser.add_argument("replay", nargs="*", help="replay ID or URL")
args = parser.parse_args(sys.argv[1:])
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
try:
db = sqlite3.connect("data.db")
_init_db(db)
if args.query:
query(args.query, db)
else:
2023-03-26 18:52:26 +11:00
if not args.replay:
parser.print_usage()
print(f"{APP}: error: either query or replay arguments are required")
sys.exit(1)
2023-03-26 18:47:11 +11:00
for r in args.replay:
try:
replay = fetch(r, cache=args.cache)
except Exception as e:
error(f"bad replay {r}")
continue
2023-03-27 20:17:18 +11:00
db.execute(
"""
INSERT INTO games(id, p1, p2, format, uploadtime)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
(replay.id, replay.p1, replay.p2, replay.format, replay.uploadtime),
)
2023-03-26 18:47:11 +11:00
parse_log(replay.id, replay.log, into=db)
db.commit()
finally:
db.close()
if __name__ == "__main__":
main()