#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from pathlib import Path import argparse import json import logging import requests import sqlite3 import sys import typing as t APP = "hhirlstats" def error(*args, **kwargs): logging.getLogger(APP).error(*args, **kwargs) def debug(*args, **kwargs): logging.getLogger(APP).debug(*args, **kwargs) class safelist(list): def get(self, index, default=None): try: return self.__getitem__(index) except IndexError: return default def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) conn.row_factory = namedtuple_factory conn.executescript( """ CREATE TABLE IF NOT EXISTS moves( game, turn, player, name, user, target, UNIQUE(game, turn, player, user) ); CREATE TABLE IF NOT EXISTS switches( game, turn, name, UNIQUE(game, turn, name) ); CREATE TABLE IF NOT EXISTS nicknames( game, player, name, specie, UNIQUE(game, player, specie) ); """ ) conn.execute( """ """ ) def parse_log(game: str, log: str, into: sqlite3.Connection): conn = into turn = 0 players = {} def resolve_mon(user: str) -> tuple[str, str]: [player, name] = user.split(": ") return players[player.strip("ab")], name for line in log.split("\n"): chunks = line.split("|")[1:] if not chunks: continue match chunks: case ["player", id, username, *rest]: players[id] = username case ["turn", turn]: turn = int(turn) case ["move", user, move, target]: player, user = resolve_mon(user) _, target = resolve_mon(target) conn.execute( """ INSERT INTO moves(game, turn, player, name, user, target) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, move, user, target), ) case ["switch", name, specie, *rest]: player, name = resolve_mon(name) conn.execute( """ INSERT INTO switches(game, turn, name) VALUES (?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, name), ) conn.execute( """ INSERT INTO nicknames(game, player, name, specie) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, player, name, specie.split(", ")[0]), ) case _: debug(f"unhandled message {chunks[0]}") QUERIES = ["gametime", "moves", "nicknames", "playtime", "usage"] def query(type: str, conn: sqlite3.Connection): match type: case "gametime": print("Longest games") print("=============") for row in conn.execute( """ SELECT game, MAX(turn) AS n FROM moves GROUP BY game ORDER BY n DESC LIMIT 5 """ ): replay = fetch(row.game, cache=True) print(f"{replay.p1} vs {replay.p2}: {row.n} turns") case "moves": print("Move usage overall") print("==================") for row in conn.execute( """ SELECT name, COUNT(*) AS n FROM moves GROUP BY name ORDER BY n DESC, name LIMIT 10 """ ): print(f"{row.name}: {row.n}") case "nicknames": print("Nickname usage per player") print("=========================") for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"): print(row_p.player) for row_s in conn.execute( "SELECT DISTINCT specie FROM nicknames WHERE player = ?", (row_p.player,), ): print(f" {row_s.specie}: ", end="") names = [] for row in conn.execute( """ SELECT player, specie, name, count(game) AS n FROM nicknames WHERE player = ? AND specie = ? GROUP BY player, specie, name ORDER BY player, specie, name """, (row_p.player, row_s.specie), ): names.append(f"{row.name} (x{row.n})") print(*names, sep=", ") case "playtime": print("Active playtime per Pokemon") print("===========================") for row in conn.execute( """ SELECT m.player, k.specie, COUNT(m.name) AS n FROM moves m LEFT JOIN nicknames k ON (m.game, m.player, m.user) = (k.game, k.player, k.name) GROUP BY k.specie, m.player ORDER BY n DESC, k.specie, m.player LIMIT 10 """ ): print(f"{row.specie} ({row.player}): {row.n} turns") case "usage": print("Pokemon usage per player") print("========================") games = { r.player: r.n for r in conn.execute( """ SELECT player, COUNT(m.game) AS n FROM (SELECT DISTINCT player, game FROM moves) m GROUP BY player """ ) } for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"): print(row_p.player) for row_s in conn.execute( """ SELECT specie, COUNT(game) AS n FROM nicknames WHERE player = ? GROUP BY specie ORDER BY n DESC, specie """, (row_p.player,), ): print( f" {row_s.specie}: {row_s.n}" f" ({row_s.n / games[row_p.player] * 100:.2f}%)" ) case _: error(f"unknown query {type}") @dataclass(frozen=True) class Replay: id: str p1: str p2: str format: str log: str uploadtime: int views: int p1id: str p2id: str formatid: str rating: int private: int password: t.Optional[str] def fetch(replay: str, cache: bool = False) -> Replay: replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay_file = Path.cwd() / "cache" / f"{replay}.json" if cache and replay_file.exists(): with replay_file.open() as f: return Replay(**json.load(f)) data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") if data.status_code != 200: raise Exception(data.text) data = data.json() if cache: replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) with replay_file.open(mode="w") as f: json.dump(data, f) return Replay(**data) def main(): parser = argparse.ArgumentParser( prog=APP, description="extracts stats from a Showdown replay" ) parser.add_argument( "-v", "--verbose", action="store_true", help="add debugging info" ) parser.add_argument("-c", "--cache", action="store_true", help="cache replays") parser.add_argument( "-Q", "--query", choices=QUERIES, help="run query instead of download", ) parser.add_argument("replay", nargs="*", help="replay ID or URL") args = parser.parse_args(sys.argv[1:]) logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) try: db = sqlite3.connect("data.db") _init_db(db) if args.query: query(args.query, db) else: if not args.replay: parser.print_usage() print(f"{APP}: error: either query or replay arguments are required") sys.exit(1) for r in args.replay: try: replay = fetch(r, cache=args.cache) except Exception as e: error(f"bad replay {r}") continue db.execute( """ INSERT INTO games(id, p1, p2, format, uploadtime) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (replay.id, replay.p1, replay.p2, replay.format, replay.uploadtime), ) parse_log(replay.id, replay.log, into=db) db.commit() finally: db.close() if __name__ == "__main__": main()