#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from pathlib import Path import argparse import json import logging import requests import sqlite3 import sys import typing as t APP = "hhirlstats" def error(*args, **kwargs): logging.getLogger(APP).error(*args, **kwargs) def debug(*args, **kwargs): logging.getLogger(APP).debug(*args, **kwargs) class safelist(list): def get(self, index, default=None): try: return self.__getitem__(index) except IndexError: return default def _init_db(conn: sqlite3.Connection): def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) conn.row_factory = namedtuple_factory conn.executescript( """ CREATE TABLE IF NOT EXISTS moves( game, turn, player, name, user, target, UNIQUE(game, turn, player, user) ); CREATE TABLE IF NOT EXISTS switches( game, turn, player, name, UNIQUE(game, turn, player, name) ); CREATE TABLE IF NOT EXISTS nicknames( game, player, name, specie, UNIQUE(game, player, specie) ); CREATE TABLE IF NOT EXISTS knockouts( game, turn, player, name, UNIQUE(game, turn, player) ); CREATE TABLE IF NOT EXISTS games( id, p1, p2, format, uploadtime, UNIQUE(id) ) """ ) def parse_log(game: str, log: str, into: sqlite3.Connection): conn = into turn = 0 players = {} def resolve_mon(user: str) -> tuple[str, str]: [player, name] = user.split(": ") return players[player.strip("ab")], name for line in log.split("\n"): chunks = line.split("|")[1:] if not chunks: continue match chunks: case ["player", id, username, *rest]: players[id] = username case ["turn", turn]: turn = int(turn) case ["move", user, move, target]: player, user = resolve_mon(user) _, target = resolve_mon(target) conn.execute( """ INSERT INTO moves(game, turn, player, name, user, target) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, move, user, target), ) case ["switch", name, specie, *rest]: player, name = resolve_mon(name) conn.execute( """ INSERT INTO switches(game, turn, player, name) VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, name), ) conn.execute( """ INSERT INTO nicknames(game, player, name, specie) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, player, name, specie.split(", ")[0]), ) case ["faint", mon]: player, mon = resolve_mon(mon) conn.execute( """ INSERT INTO knockouts(game, turn, player, name) VALUES(?, ?, ?, ?) ON CONFLICT DO NOTHING """, (game, turn, player, mon), ) case _: debug(f"unhandled message {chunks[0]}") @dataclass(frozen=True) class Replay: id: str p1: str p2: str format: str log: str uploadtime: int views: int p1id: str p2id: str formatid: str rating: int private: int password: t.Optional[str] def fetch(replay: str, cache: bool = False) -> Replay: replay = replay.replace("https://replay.pokemonshowdown.com/", "") replay_file = Path.cwd() / "cache" / f"{replay}.json" if cache and replay_file.exists(): with replay_file.open() as f: return Replay(**json.load(f)) data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") if data.status_code != 200: raise Exception(data.text) data = data.json() if cache: replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) with replay_file.open(mode="w") as f: json.dump(data, f) return Replay(**data) def main(): parser = argparse.ArgumentParser( prog=APP, description="extracts stats from a Showdown replay" ) parser.add_argument( "-v", "--verbose", action="store_true", help="add debugging info" ) parser.add_argument("-c", "--cache", action="store_true", help="cache replays") parser.add_argument("replay", nargs="+", help="replay ID or URL") args = parser.parse_args(sys.argv[1:]) logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) try: db = sqlite3.connect("data.db") _init_db(db) for r in args.replay: try: replay = fetch(r, cache=args.cache) except Exception as e: error(f"bad replay {r}") continue db.execute( """ INSERT INTO games(id, p1, p2, format, uploadtime) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING """, (replay.id, replay.p1, replay.p2, replay.format, replay.uploadtime), ) parse_log(replay.id, replay.log, into=db) db.commit() finally: db.close() if __name__ == "__main__": main()