Sun Mar 26 18:47:11 2023 +1100 757a17a initial commit [xeals] diff --git a/README.md b/README.md new file mode 100644 index 0000000..5364110 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# Holy Heck I Really Like Stats + +Pokemon Showdown data processing, mostly for HHIRLLL's Pokemon league. Ugly as +fuck. diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..fdafac3 --- /dev/null +++ b/flake.lock @@ -0,0 +1,42 @@ +{ + "nodes": { + "flake-utils": { + "locked": { + "lastModified": 1678901627, + "narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1679802151, + "narHash": "sha256-TqQLDw3L4EyF5s+e9vNt7mC+j9IPSPie2gqgHgLMTRk=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "e3eeeecadd5e6ef055a83c5eaed8a660b5b464dd", + "type": "github" + }, + "original": { + "owner": "nixos", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..5e245ee --- /dev/null +++ b/flake.nix @@ -0,0 +1,22 @@ +{ + description = ""; + + # Also consider nixpkgs/nixos-[RELEASE] + inputs.nixpkgs.url = "github:nixos/nixpkgs"; + inputs.flake-utils.url = "github:numtide/flake-utils"; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem + (system: + let pkgs = import nixpkgs { inherit system; }; in { + devShells.default = pkgs.mkShellNoCC { + buildInputs = + let + python = pkgs.python3.withPackages (ps: [ + ps.requests + ]); + in + [ python pkgs.sqlite ]; + }; + }); +} diff --git a/games.txt b/games.txt new file mode 100644 index 0000000..4b62ec4 --- /dev/null +++ b/games.txt @@ -0,0 +1,14 @@ +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-24102 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-24106 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-24215 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-24733 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-25876 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-27137 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-28100 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-28370 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-28751 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-29063 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-29698 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-30488 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-30858 +https://replay.pokemonshowdown.com/dl-gen9paldeadexposthomedraft-31241 diff --git a/main.py b/main.py new file mode 100755 index 0000000..4ceba45 --- /dev/null +++ b/main.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 + +from collections import namedtuple +from dataclasses import dataclass +from pathlib import Path +import argparse +import json +import logging +import requests +import sqlite3 +import sys +import typing as t + + +APP = "hhirlstats" + + +def error(*args, **kwargs): + logging.getLogger(APP).error(*args, **kwargs) + + +def debug(*args, **kwargs): + logging.getLogger(APP).debug(*args, **kwargs) + + +class safelist(list): + def get(self, index, default=None): + try: + return self.__getitem__(index) + except IndexError: + return default + + +def _init_db(conn: sqlite3.Connection): + def namedtuple_factory(cursor, row): + fields = [column[0] for column in cursor.description] + cls = namedtuple("Row", fields) + return cls._make(row) + + conn.row_factory = namedtuple_factory + + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS moves( + game, turn, player, name, user, target, + UNIQUE(game, turn, player, user) + ); + CREATE TABLE IF NOT EXISTS switches( + game, turn, name, + UNIQUE(game, turn, name) + ); + CREATE TABLE IF NOT EXISTS nicknames( + game, player, name, specie, + UNIQUE(game,player, specie) + ); + """ + ) + conn.execute( + """ + """ + ) + + +def parse_log(game: str, log: str, into: sqlite3.Connection): + conn = into + + turn = 0 + players = {} + + def resolve_mon(user: str) -> tuple[str, str]: + [player, name] = user.split(": ") + return players[player.strip("ab")], name + + for line in log.split("\n"): + chunks = line.split("|")[1:] + if not chunks: + continue + + match chunks: + case ["player", id, username, *rest]: + players[id] = username + case ["turn", turn]: + turn = int(turn) + case ["move", user, move, target]: + player, user = resolve_mon(user) + _, target = resolve_mon(target) + conn.execute( + """ + INSERT INTO moves(game, turn, player, name, user, target) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + (game, turn, player, move, user, target), + ) + case ["switch", name, specie, *rest]: + player, name = resolve_mon(name) + conn.execute( + """ + INSERT INTO switches(game, turn, name) + VALUES (?, ?, ?) + ON CONFLICT DO NOTHING + """, + (game, turn, name), + ) + conn.execute( + """ + INSERT INTO nicknames(game, player, name, specie) + VALUES(?, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + (game, player, name, specie.split(", ")[0]), + ) + case _: + debug(f"unhandled message {chunks[0]}") + + +QUERIES = ["gametime", "moves", "nicknames", "playtime", "usage"] + + +def query(type: str, conn: sqlite3.Connection): + match type: + case "gametime": + print("Longest games") + print("=============") + for row in conn.execute( + """ + SELECT game, MAX(turn) AS n + FROM moves + GROUP BY game + ORDER BY n DESC + LIMIT 5 + """ + ): + replay = fetch(row.game, cache=True) + print(f"{replay.p1} vs {replay.p2}: {row.n} turns") + + case "moves": + print("Move usage overall") + print("==================") + for row in conn.execute( + """ + SELECT name, COUNT(*) AS n + FROM moves + GROUP BY name + ORDER BY n DESC, name + LIMIT 10 + """ + ): + print(f"{row.name}: {row.n}") + + case "nicknames": + print("Nickname usage per player") + print("=========================") + for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"): + print(row_p.player) + for row_s in conn.execute( + "SELECT DISTINCT specie FROM nicknames WHERE player = ?", + (row_p.player,), + ): + print(f" {row_s.specie}: ", end="") + names = [] + for row in conn.execute( + """ + SELECT player, specie, name, count(game) AS n + FROM nicknames + WHERE player = ? AND specie = ? + GROUP BY player, specie, name + ORDER BY player, specie, name + """, + (row_p.player, row_s.specie), + ): + names.append(f"{row.name} (x{row.n})") + print(*names, sep=", ") + + case "playtime": + print("Active playtime per Pokemon") + print("===========================") + for row in conn.execute( + """ + SELECT m.player, k.specie, COUNT(m.name) AS n + FROM moves m + LEFT JOIN nicknames k ON (m.game, m.player, m.user) = (k.game, k.player, k.name) + GROUP BY k.specie, m.player + ORDER BY n DESC, k.specie, m.player + LIMIT 10 + """ + ): + print(f"{row.specie} ({row.player}): {row.n} turns") + + case "usage": + print("Pokemon usage per player") + print("========================") + games = { + r.player: r.n + for r in conn.execute( + """ + SELECT player, COUNT(m.game) AS n + FROM (SELECT DISTINCT player, game FROM moves) m + GROUP BY player + """ + ) + } + for row_p in conn.execute("SELECT DISTINCT player FROM nicknames"): + print(row_p.player) + for row_s in conn.execute( + """ + SELECT specie, COUNT(game) AS n + FROM nicknames + WHERE player = ? + GROUP BY specie + ORDER BY n DESC, specie + """, + (row_p.player,), + ): + print( + f" {row_s.specie}: {row_s.n}" + f" ({row_s.n / games[row_p.player] * 100:.2f}%)" + ) + case _: + error(f"unknown query {type}") + + +@dataclass(frozen=True) +class Replay: + id: str + p1: str + p2: str + format: str + log: str + uploadtime: int + views: int + p1id: str + p2id: str + formatid: str + rating: int + private: int + password: t.Optional[str] + + +def fetch(replay: str, cache: bool = False) -> Replay: + replay = replay.replace("https://replay.pokemonshowdown.com/", "") + replay_file = Path.cwd() / "cache" / f"{replay}.json" + + if cache and replay_file.exists(): + with replay_file.open() as f: + return Replay(**json.load(f)) + + data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json") + if data.status_code != 200: + raise Exception(data.text) + data = data.json() + + if cache: + replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + with replay_file.open(mode="w") as f: + json.dump(data, f) + + return Replay(**data) + + +def main(): + parser = argparse.ArgumentParser( + prog=APP, description="extracts stats from a Showdown replay" + ) + parser.add_argument( + "-v", "--verbose", action="store_true", help="add debugging info" + ) + parser.add_argument("-c", "--cache", action="store_true", help="cache replays") + parser.add_argument( + "-Q", + "--query", + choices=QUERIES, + help="run query instead of download", + ) + parser.add_argument("replay", nargs="*", help="replay ID or URL") + + args = parser.parse_args(sys.argv[1:]) + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + + try: + db = sqlite3.connect("data.db") + _init_db(db) + + if args.query: + query(args.query, db) + else: + for r in args.replay: + try: + replay = fetch(r, cache=args.cache) + except Exception as e: + error(f"bad replay {r}") + continue + + parse_log(replay.id, replay.log, into=db) + db.commit() + + finally: + db.close() + + +if __name__ == "__main__": + main()