Compare commits
11 Commits
dont-resol
...
trunk
Author | SHA1 | Date | |
---|---|---|---|
e421bd1de9 | |||
132096b427 | |||
c3df316aa4 | |||
008ddf6298 | |||
145f220a07 | |||
a839460e9a | |||
8f8b96937d | |||
87114b1d1d | |||
e8149e7c3b | |||
37a2df3039 | |||
8a348eca84 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -8,5 +8,6 @@ tmp/
|
|||||||
build
|
build
|
||||||
htmlcov
|
htmlcov
|
||||||
|
|
||||||
data.db
|
*.db
|
||||||
cache/
|
cache/
|
||||||
|
token
|
||||||
|
@ -11,8 +11,8 @@ fuck.
|
|||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ ./main.py -h
|
$ ./index.py -h
|
||||||
usage: hhirlstats [-h] [-v] [-C] [-o FILE] replay [replay ...]
|
usage: hhirlstats [-h] [-v] [-C] [-t FILE] [-o FILE] replay [replay ...]
|
||||||
|
|
||||||
extracts stats from a Showdown replay
|
extracts stats from a Showdown replay
|
||||||
|
|
||||||
@ -23,6 +23,8 @@ options:
|
|||||||
-h, --help show this help message and exit
|
-h, --help show this help message and exit
|
||||||
-v, --verbose add debugging info (default: None)
|
-v, --verbose add debugging info (default: None)
|
||||||
-C, --no-cache fetch replays instead of using cache (default: False)
|
-C, --no-cache fetch replays instead of using cache (default: False)
|
||||||
|
-t FILE, --teams FILE
|
||||||
|
JSON file defining players to teams (default: teams.json)
|
||||||
-o FILE, --output FILE
|
-o FILE, --output FILE
|
||||||
output data file (default: data.db)
|
output data file (default: data.db)
|
||||||
```
|
```
|
||||||
|
74
bot.py
Executable file
74
bot.py
Executable file
@ -0,0 +1,74 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from discord.utils import setup_logging
|
||||||
|
import argparse
|
||||||
|
import discord
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import subprocess as sp
|
||||||
|
|
||||||
|
discord.utils.setup_logging()
|
||||||
|
_log = logging.getLogger("statbot")
|
||||||
|
|
||||||
|
_GAMES = "games.txt"
|
||||||
|
_DB = "holy-heck2.db"
|
||||||
|
_DB_DEST = f"/var/lib/grafana/{_DB}"
|
||||||
|
|
||||||
|
|
||||||
|
def _write_game(content: str):
|
||||||
|
try:
|
||||||
|
with open(_GAMES, "a") as f:
|
||||||
|
f.write(content)
|
||||||
|
f.write("\n")
|
||||||
|
except:
|
||||||
|
_log.exception(f"failed writing game {content}")
|
||||||
|
|
||||||
|
|
||||||
|
def _update_db():
|
||||||
|
try:
|
||||||
|
games = []
|
||||||
|
with open(_GAMES) as f:
|
||||||
|
for line in f:
|
||||||
|
games.append(line.strip())
|
||||||
|
sp.run(["./index.py", "-o", _DB] + games)
|
||||||
|
shutil.move(_DB, _DB_DEST)
|
||||||
|
_log.info("updated db")
|
||||||
|
except:
|
||||||
|
_log.exception(f"failed updating db")
|
||||||
|
|
||||||
|
|
||||||
|
class BotClient(discord.Client):
|
||||||
|
async def on_ready(self):
|
||||||
|
_log.info(f"ready as {self.user}")
|
||||||
|
|
||||||
|
async def on_message(self, message: discord.Message):
|
||||||
|
content = message.content
|
||||||
|
if re.match("https://replay.pokemonshowdown.com/dl-.*", content):
|
||||||
|
_log.info(f"Recognised {content} as a League game")
|
||||||
|
_write_game(content)
|
||||||
|
_update_db()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
"-t",
|
||||||
|
"--token-file",
|
||||||
|
metavar="FILE",
|
||||||
|
default="token",
|
||||||
|
help="file containing Discord API token",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
intents = discord.Intents.default()
|
||||||
|
intents.message_content = True
|
||||||
|
client = BotClient(intents=intents)
|
||||||
|
with open(args.token_file) as f:
|
||||||
|
token = f.read().strip()
|
||||||
|
client.run(token, log_handler=None)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
10
flake.nix
10
flake.nix
@ -13,10 +13,18 @@
|
|||||||
buildInputs =
|
buildInputs =
|
||||||
let
|
let
|
||||||
python = pkgs.python3.withPackages (ps: [
|
python = pkgs.python3.withPackages (ps: [
|
||||||
|
ps.discordpy
|
||||||
|
ps.mypy
|
||||||
ps.requests
|
ps.requests
|
||||||
|
ps.types-requests
|
||||||
]);
|
]);
|
||||||
in
|
in
|
||||||
[ python pkgs.sqlite ];
|
[
|
||||||
|
python
|
||||||
|
pkgs.sqlite
|
||||||
|
python.pkgs.python-lsp-server
|
||||||
|
python.pkgs.pylsp-mypy
|
||||||
|
];
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
573
index.py
Executable file
573
index.py
Executable file
@ -0,0 +1,573 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from functools import partial, partialmethod
|
||||||
|
from pathlib import Path
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import requests
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
import typing as t
|
||||||
|
|
||||||
|
|
||||||
|
logging.TRACE = 5 # type: ignore
|
||||||
|
logging.addLevelName(logging.TRACE, "TRACE") # type: ignore
|
||||||
|
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE) # type: ignore
|
||||||
|
logging.trace = partial(logging.log, logging.TRACE) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class LogFormatter(logging.Formatter):
|
||||||
|
|
||||||
|
_format = "%(name)s [%(levelname)s] %(message)s"
|
||||||
|
FORMATS = {
|
||||||
|
logging.TRACE: f"\x1b[30;20m{_format}\x1b[0m", # type: ignore
|
||||||
|
logging.DEBUG: f"\x1b[38;20m{_format}\x1b[0m",
|
||||||
|
logging.INFO: f"\x1b[34;20m{_format}\x1b[0m",
|
||||||
|
logging.WARNING: f"\x1b[33;20m{_format}\x1b[0m",
|
||||||
|
logging.ERROR: f"\x1b[31;20m{_format}\x1b[0m",
|
||||||
|
logging.CRITICAL: f"\x1b[31;1m{_format}\x1b[0m",
|
||||||
|
}
|
||||||
|
|
||||||
|
def format(self, record):
|
||||||
|
fmt = self.FORMATS.get(record.levelno)
|
||||||
|
formatter = logging.Formatter(fmt)
|
||||||
|
return formatter.format(record)
|
||||||
|
|
||||||
|
|
||||||
|
APP = "hhirlstats"
|
||||||
|
LOG = logging.getLogger(APP)
|
||||||
|
_ch = logging.StreamHandler()
|
||||||
|
_ch.setFormatter(LogFormatter())
|
||||||
|
LOG.addHandler(_ch)
|
||||||
|
|
||||||
|
|
||||||
|
def _init_db(conn: sqlite3.Connection):
|
||||||
|
def namedtuple_factory(cursor, row):
|
||||||
|
fields = [column[0] for column in cursor.description]
|
||||||
|
cls = namedtuple("Row", fields)
|
||||||
|
return cls._make(row)
|
||||||
|
|
||||||
|
conn.row_factory = namedtuple_factory
|
||||||
|
|
||||||
|
conn.executescript(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS moves(
|
||||||
|
game, turn, player, pokemon, move, target,
|
||||||
|
UNIQUE(game, turn, player, pokemon)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS switches(
|
||||||
|
game, turn, player, pokemon,
|
||||||
|
UNIQUE(game, turn, player, pokemon)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS nicknames(
|
||||||
|
game, player, pokemon, specie,
|
||||||
|
UNIQUE(game, player, specie)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS knockouts(
|
||||||
|
game, turn, player, pokemon,
|
||||||
|
UNIQUE(game, turn, player)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS indirect_knockouts(
|
||||||
|
game, turn, player, pokemon,
|
||||||
|
reason, source, source_player,
|
||||||
|
UNIQUE(game, turn, player)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS games(
|
||||||
|
id, p1, p2, format, uploadtime, winner,
|
||||||
|
UNIQUE(id)
|
||||||
|
);
|
||||||
|
-- No good way to ensure idempotence for damage; just re-build it.
|
||||||
|
DROP TABLE IF EXISTS damage;
|
||||||
|
CREATE TABLE damage(game, player, pokemon, value);
|
||||||
|
DROP TABLE IF EXISTS indirect_damage;
|
||||||
|
CREATE TABLE indirect_damage(game, player, pokemon, value);
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Either the value "p1" or "p2"
|
||||||
|
PlayerTag = t.NewType("PlayerTag", str)
|
||||||
|
|
||||||
|
# A player's name
|
||||||
|
Player = t.NewType("Player", str)
|
||||||
|
|
||||||
|
# A player prefixed with a PlayerTag
|
||||||
|
TaggedPlayer = t.NewType("TaggedPlayer", str)
|
||||||
|
|
||||||
|
# A Pokemon identified by its nickname, if any
|
||||||
|
Pokemon = t.NewType("Pokemon", str)
|
||||||
|
|
||||||
|
# A Pokemon specie
|
||||||
|
PokemonSpecie = t.NewType("PokemonSpecie", str)
|
||||||
|
|
||||||
|
# A Pokemon prefixed with a PlayerTag
|
||||||
|
TaggedPokemon = t.NewType("TaggedPokemon", str)
|
||||||
|
|
||||||
|
|
||||||
|
def tag(tagged: TaggedPlayer | TaggedPokemon) -> PlayerTag:
|
||||||
|
return PlayerTag(tagged[0:1])
|
||||||
|
|
||||||
|
|
||||||
|
TEAMS: dict[Player, Player] = {}
|
||||||
|
_logged_teams: set[Player] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def team(player: Player) -> Player:
|
||||||
|
"""Maps a username to a defined team."""
|
||||||
|
if player in TEAMS:
|
||||||
|
return TEAMS[player]
|
||||||
|
else:
|
||||||
|
if not player in _logged_teams and player:
|
||||||
|
LOG.warning(f"missing team mapping for {player}")
|
||||||
|
_logged_teams.add(player)
|
||||||
|
return player
|
||||||
|
|
||||||
|
|
||||||
|
class LogParser:
|
||||||
|
|
||||||
|
turn = 0
|
||||||
|
players: dict[PlayerTag, Player] = {}
|
||||||
|
hp: dict[TaggedPokemon, int] = {}
|
||||||
|
|
||||||
|
# Memorises the user of the move that causes environment setting or status,
|
||||||
|
# its target, and the move name (for debugging).
|
||||||
|
last_move: t.Optional[tuple[TaggedPokemon, TaggedPokemon, str]] = None
|
||||||
|
|
||||||
|
# Memorises the last hazard set against a player and the causing user.
|
||||||
|
last_env_set: dict[tuple[PlayerTag, str], TaggedPokemon] = {}
|
||||||
|
|
||||||
|
# Memorises statuses set on a pokemon and the causing user.
|
||||||
|
last_status_set: dict[tuple[TaggedPokemon, str], TaggedPokemon] = {}
|
||||||
|
|
||||||
|
def __init__(self, game: str, into: sqlite3.Connection):
|
||||||
|
self.game = game
|
||||||
|
self.conn: sqlite3.Connection = into
|
||||||
|
|
||||||
|
def split_pokemon(self, user: TaggedPokemon) -> tuple[Player, Pokemon]:
|
||||||
|
"""Splits a TaggedPokemon into the owning player and the Pokemon."""
|
||||||
|
[player, pokemon] = user.split(": ")
|
||||||
|
return self.players[PlayerTag(player.strip("ab"))], Pokemon(pokemon)
|
||||||
|
|
||||||
|
@t.overload
|
||||||
|
def specie(self, pokemon: Pokemon, player: Player) -> PokemonSpecie:
|
||||||
|
"""Resolves the species of a nicknamed Pokemon."""
|
||||||
|
...
|
||||||
|
|
||||||
|
@t.overload
|
||||||
|
def specie(self, pokemon: TaggedPokemon) -> PokemonSpecie:
|
||||||
|
"""Resolves the species of a Pokemon given its Showdown identifier (used
|
||||||
|
in split_pokemon)."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def specie(
|
||||||
|
self, pokemon: Pokemon | TaggedPokemon, player: t.Optional[Player] = None
|
||||||
|
) -> PokemonSpecie:
|
||||||
|
if not player:
|
||||||
|
[player, pokemon] = self.split_pokemon(TaggedPokemon(pokemon))
|
||||||
|
return (
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT specie
|
||||||
|
FROM nicknames
|
||||||
|
WHERE (game, player, pokemon) = (?, ?, ?)
|
||||||
|
LIMIT 1
|
||||||
|
""",
|
||||||
|
(self.game, team(player), pokemon),
|
||||||
|
)
|
||||||
|
.fetchall()[0]
|
||||||
|
.specie
|
||||||
|
)
|
||||||
|
|
||||||
|
def _reset(self):
|
||||||
|
self.turn = 0
|
||||||
|
self.players.clear()
|
||||||
|
self.hp.clear()
|
||||||
|
self.last_move = None
|
||||||
|
self.last_env_set.clear()
|
||||||
|
self.last_status_set.clear()
|
||||||
|
|
||||||
|
def _log_appearance(self, name: TaggedPokemon, specie: str):
|
||||||
|
|
||||||
|
# Also includes gender and formes.
|
||||||
|
trimmed_specie = PokemonSpecie(specie.split(", ")[0])
|
||||||
|
player, nickname = self.split_pokemon(name)
|
||||||
|
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO nicknames(game, player, pokemon, specie)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(self.game, team(player), nickname, trimmed_specie),
|
||||||
|
)
|
||||||
|
|
||||||
|
def parse(self, log: str):
|
||||||
|
self._reset()
|
||||||
|
|
||||||
|
for line in log.split("\n"):
|
||||||
|
chunks = line.split("|")[1:]
|
||||||
|
if not chunks:
|
||||||
|
continue
|
||||||
|
|
||||||
|
LOG.trace(line) # type: ignore
|
||||||
|
|
||||||
|
match chunks:
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str, str
|
||||||
|
case ["drag", name_, specie, status, *rest]:
|
||||||
|
name = TaggedPokemon(name_)
|
||||||
|
|
||||||
|
self.hp[name] = int(status.split("/")[0])
|
||||||
|
self._log_appearance(name, specie)
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon
|
||||||
|
case ["faint", pokemon_]:
|
||||||
|
pokemon = TaggedPokemon(pokemon_)
|
||||||
|
|
||||||
|
player, _ = self.split_pokemon(pokemon)
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO knockouts(game, turn, player, pokemon)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(self.game, self.turn, team(player), self.specie(pokemon)),
|
||||||
|
)
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str, TaggedPokemon
|
||||||
|
case ["move", user_, move, target_]:
|
||||||
|
user = TaggedPokemon(user_)
|
||||||
|
target = TaggedPokemon(target_)
|
||||||
|
|
||||||
|
last_move = (user, target, move)
|
||||||
|
player, _ = self.split_pokemon(user)
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO moves(game, turn, player, pokemon, move, target)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
self.game,
|
||||||
|
self.turn,
|
||||||
|
team(player),
|
||||||
|
self.specie(user),
|
||||||
|
move,
|
||||||
|
self.specie(target),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# t.Literal, PlayerTag, Player
|
||||||
|
case ["player", id, username, *rest]:
|
||||||
|
self.players[PlayerTag(id)] = Player(username)
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str
|
||||||
|
case ["replace", name, specie]:
|
||||||
|
self._log_appearance(name, specie)
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str, str, t.Optional[str]
|
||||||
|
case ["switch", name, specie, status, *rest]:
|
||||||
|
self.hp[name] = int(status.split("/")[0])
|
||||||
|
|
||||||
|
# Also includes gender and formes.
|
||||||
|
trimmed_specie = specie.split(", ")[0]
|
||||||
|
player, nickname = self.split_pokemon(name)
|
||||||
|
|
||||||
|
self._log_appearance(name, specie)
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO switches(game, turn, player, pokemon)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(self.game, self.turn, team(player), trimmed_specie),
|
||||||
|
)
|
||||||
|
|
||||||
|
# t.Literal, str
|
||||||
|
case ["turn", turn]:
|
||||||
|
self.turn = int(turn)
|
||||||
|
|
||||||
|
# t.Literal, Player
|
||||||
|
case ["win", player]:
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE games
|
||||||
|
SET winner = ?
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(team(player), self.game),
|
||||||
|
)
|
||||||
|
|
||||||
|
case ["-heal", pokemon, status, *rest]:
|
||||||
|
self.hp[pokemon] = int(status.split("/")[0])
|
||||||
|
|
||||||
|
# TODO: track healing done
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str
|
||||||
|
case ["-damage", pokemon, status]:
|
||||||
|
# Pokemon takes direct (non-hazard/condition) damage; status
|
||||||
|
# can be a percentage "70/100" with or without condition, or
|
||||||
|
# "0 fnt"
|
||||||
|
new_hp = int(re.split("[/ ]", status)[0])
|
||||||
|
LOG.debug(f"{pokemon} dropped to {new_hp} from {self.hp[pokemon]}")
|
||||||
|
LOG.debug(f"source: {last_move}")
|
||||||
|
|
||||||
|
# resolve to damage source
|
||||||
|
if last_move[1] != pokemon:
|
||||||
|
LOG.warning(
|
||||||
|
f"{pokemon} took direct damage but last move"
|
||||||
|
f" {last_move[2]} was not targeted at them"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
damage_source = last_move[0]
|
||||||
|
source_player, source_nickname = self.split_pokemon(damage_source)
|
||||||
|
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO damage(game, player, pokemon, value)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
self.game,
|
||||||
|
team(source_player),
|
||||||
|
self.specie(damage_source),
|
||||||
|
self.hp[pokemon] - new_hp,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.hp[pokemon] = new_hp
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str, str
|
||||||
|
case ["-damage", pokemon_, status, from_]:
|
||||||
|
pokemon = TaggedPokemon(pokemon_)
|
||||||
|
|
||||||
|
# Pokemon takes indirect damage; status can be a percentage
|
||||||
|
# "70/100" with or without condition, or "0 fnt"
|
||||||
|
new_hp = int(re.split("[/ ]", status)[0])
|
||||||
|
LOG.debug(f"{pokemon} dropped to {new_hp} from {from_}")
|
||||||
|
|
||||||
|
LOG.debug(f"tracing reason for {line}")
|
||||||
|
reason = from_.replace("[from] ", "")
|
||||||
|
|
||||||
|
source: TaggedPokemon | str | None = None
|
||||||
|
source_is_pokemon = True
|
||||||
|
|
||||||
|
test_hazard = self.last_env_set.get((tag(pokemon), reason))
|
||||||
|
if test_hazard:
|
||||||
|
source = test_hazard
|
||||||
|
LOG.debug(f"identified hazard source {source}")
|
||||||
|
|
||||||
|
test_status = self.last_status_set.get((pokemon, reason))
|
||||||
|
if test_status:
|
||||||
|
source = test_status
|
||||||
|
LOG.debug(f"identified move source {source}")
|
||||||
|
|
||||||
|
if reason == "Recoil" or reason.startswith("item: "):
|
||||||
|
LOG.debug(f"identified special source {reason}")
|
||||||
|
reason = reason.replace("item: ", "")
|
||||||
|
source = "self"
|
||||||
|
source_is_pokemon = False
|
||||||
|
|
||||||
|
if not source:
|
||||||
|
LOG.error(f"missing reason for {line}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
player, nickname = self.split_pokemon(pokemon)
|
||||||
|
if source.startswith("p1") or source.startswith("p2"):
|
||||||
|
source_player, _ = self.split_pokemon(TaggedPokemon(source))
|
||||||
|
else:
|
||||||
|
source_player = None # type: ignore
|
||||||
|
source_is_pokemon = False
|
||||||
|
|
||||||
|
if source_player:
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO indirect_damage(game, player, pokemon, value)
|
||||||
|
VALUES(?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
self.game,
|
||||||
|
team(source_player),
|
||||||
|
self.specie(TaggedPokemon(source)),
|
||||||
|
self.hp[pokemon] - new_hp,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
if status == "0 fnt":
|
||||||
|
self.conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO indirect_knockouts(
|
||||||
|
game, turn, player, pokemon,
|
||||||
|
reason, source, source_player)
|
||||||
|
VALUES(?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
self.game,
|
||||||
|
self.turn,
|
||||||
|
team(player),
|
||||||
|
self.specie(pokemon),
|
||||||
|
reason,
|
||||||
|
self.specie(TaggedPokemon(source))
|
||||||
|
if source_is_pokemon
|
||||||
|
else source,
|
||||||
|
team(source_player),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# t.Literal, TaggedPlayer, str
|
||||||
|
case ["-sidestart", side_, env]:
|
||||||
|
side = TaggedPlayer(side_)
|
||||||
|
|
||||||
|
if not last_move:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
self.last_env_set[
|
||||||
|
(tag(side), env.replace("move: ", ""))
|
||||||
|
] = last_move[0]
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str
|
||||||
|
case ["-status", pokemon_, cond]:
|
||||||
|
pokemon = TaggedPokemon(pokemon_)
|
||||||
|
|
||||||
|
if not last_move or last_move[1] != pokemon:
|
||||||
|
LOG.warning(f"missing previous move for {line}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
LOG.debug(f"{line} <- {last_move}")
|
||||||
|
self.last_status_set[(pokemon, cond)] = last_move[0]
|
||||||
|
|
||||||
|
# t.Literal, TaggedPokemon, str
|
||||||
|
case ["-terastallize", pokemon_, type]:
|
||||||
|
pokemon = TaggedPokemon(pokemon_)
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|
||||||
|
case _:
|
||||||
|
# LOG.debug(f"unhandled message {chunks[0]}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Replay:
|
||||||
|
id: str
|
||||||
|
p1: str
|
||||||
|
p2: str
|
||||||
|
format: str
|
||||||
|
log: str
|
||||||
|
uploadtime: int
|
||||||
|
views: int
|
||||||
|
p1id: str
|
||||||
|
p2id: str
|
||||||
|
formatid: str
|
||||||
|
rating: int
|
||||||
|
private: int
|
||||||
|
password: t.Optional[str]
|
||||||
|
|
||||||
|
|
||||||
|
def fetch(replay: str, cache: bool = True) -> Replay:
|
||||||
|
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
|
||||||
|
replay_file = Path.cwd() / "cache" / f"{replay}.json"
|
||||||
|
|
||||||
|
if cache and replay_file.exists():
|
||||||
|
with replay_file.open() as f:
|
||||||
|
return Replay(**json.load(f))
|
||||||
|
|
||||||
|
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
|
||||||
|
if data.status_code != 200:
|
||||||
|
raise Exception(data.text)
|
||||||
|
data = data.json()
|
||||||
|
|
||||||
|
if cache:
|
||||||
|
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||||
|
with replay_file.open(mode="w") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
|
||||||
|
return Replay(**data) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog=APP,
|
||||||
|
description="extracts stats from a Showdown replay",
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
|
||||||
|
parser.add_argument(
|
||||||
|
"-C",
|
||||||
|
"--no-cache",
|
||||||
|
action="store_true",
|
||||||
|
help="fetch replays instead of using cache",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-t",
|
||||||
|
"--teams",
|
||||||
|
action="store",
|
||||||
|
metavar="FILE",
|
||||||
|
default="teams.json",
|
||||||
|
help="JSON file defining players to teams",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-o",
|
||||||
|
"--output",
|
||||||
|
action="store",
|
||||||
|
metavar="FILE",
|
||||||
|
default="data.db",
|
||||||
|
help="output data file",
|
||||||
|
)
|
||||||
|
parser.add_argument("replay", nargs="+", help="replay ID or URL")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.verbose and args.verbose > 1:
|
||||||
|
LOG.setLevel(logging.TRACE)
|
||||||
|
elif args.verbose:
|
||||||
|
LOG.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
if args.teams:
|
||||||
|
with open(args.teams) as f:
|
||||||
|
global TEAMS
|
||||||
|
TEAMS = json.load(f)
|
||||||
|
|
||||||
|
try:
|
||||||
|
db = sqlite3.connect(args.output)
|
||||||
|
_init_db(db)
|
||||||
|
|
||||||
|
for r in args.replay:
|
||||||
|
try:
|
||||||
|
replay = fetch(r, cache=not args.no_cache)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(f"bad replay {r}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
LOG.info(f"indexing game {replay.id}")
|
||||||
|
db.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO games(id, p1, p2, format, uploadtime)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
replay.id,
|
||||||
|
team(replay.p1),
|
||||||
|
team(replay.p2),
|
||||||
|
replay.format,
|
||||||
|
replay.uploadtime,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
LogParser(replay.id, db).parse(replay.log)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
438
main.py
438
main.py
@ -1,438 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
from collections import namedtuple
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from functools import partial, partialmethod
|
|
||||||
from pathlib import Path
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import re
|
|
||||||
import requests
|
|
||||||
import sqlite3
|
|
||||||
import sys
|
|
||||||
import typing as t
|
|
||||||
|
|
||||||
|
|
||||||
logging.TRACE = 5
|
|
||||||
logging.addLevelName(logging.TRACE, "TRACE")
|
|
||||||
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
|
|
||||||
logging.trace = partial(logging.log, logging.TRACE)
|
|
||||||
|
|
||||||
|
|
||||||
class LogFormatter(logging.Formatter):
|
|
||||||
|
|
||||||
format = "%(name)s [%(levelname)s] %(message)s"
|
|
||||||
FORMATS = {
|
|
||||||
logging.TRACE: f"\x1b[30;20m{format}\x1b[0m",
|
|
||||||
logging.DEBUG: f"\x1b[38;20m{format}\x1b[0m",
|
|
||||||
logging.INFO: f"\x1b[34;20m{format}\x1b[0m",
|
|
||||||
logging.WARNING: f"\x1b[33;20m{format}\x1b[0m",
|
|
||||||
logging.ERROR: f"\x1b[31;20m{format}\x1b[0m",
|
|
||||||
logging.CRITICAL: f"\x1b[31;1m{format}\x1b[0m",
|
|
||||||
}
|
|
||||||
|
|
||||||
def format(self, record):
|
|
||||||
fmt = self.FORMATS.get(record.levelno)
|
|
||||||
formatter = logging.Formatter(fmt)
|
|
||||||
return formatter.format(record)
|
|
||||||
|
|
||||||
|
|
||||||
APP = "hhirlstats"
|
|
||||||
LOG = logging.getLogger(APP)
|
|
||||||
_ch = logging.StreamHandler()
|
|
||||||
_ch.setFormatter(LogFormatter())
|
|
||||||
LOG.addHandler(_ch)
|
|
||||||
|
|
||||||
|
|
||||||
TEAMS = {}
|
|
||||||
_logged_teams = []
|
|
||||||
|
|
||||||
|
|
||||||
def team(player: str) -> str:
|
|
||||||
"""Maps a username to a defined team."""
|
|
||||||
if player in TEAMS:
|
|
||||||
return TEAMS[player]
|
|
||||||
else:
|
|
||||||
if not player in _logged_teams and player:
|
|
||||||
LOG.warning(f"missing team mapping for {player}")
|
|
||||||
_logged_teams.append(player)
|
|
||||||
return player
|
|
||||||
|
|
||||||
|
|
||||||
class safelist(list):
|
|
||||||
def get(self, index, default=None):
|
|
||||||
try:
|
|
||||||
return self.__getitem__(index)
|
|
||||||
except IndexError:
|
|
||||||
return default
|
|
||||||
|
|
||||||
|
|
||||||
def _init_db(conn: sqlite3.Connection):
|
|
||||||
def namedtuple_factory(cursor, row):
|
|
||||||
fields = [column[0] for column in cursor.description]
|
|
||||||
cls = namedtuple("Row", fields)
|
|
||||||
return cls._make(row)
|
|
||||||
|
|
||||||
conn.row_factory = namedtuple_factory
|
|
||||||
|
|
||||||
conn.executescript(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS moves(
|
|
||||||
game, turn, player, name, user, target,
|
|
||||||
UNIQUE(game, turn, player, user)
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS switches(
|
|
||||||
game, turn, player, name,
|
|
||||||
UNIQUE(game, turn, player, name)
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS nicknames(
|
|
||||||
game, player, name, specie,
|
|
||||||
UNIQUE(game, player, specie)
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS knockouts(
|
|
||||||
game, turn, player, name,
|
|
||||||
UNIQUE(game, turn, player)
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS indirect_knockouts(
|
|
||||||
game, turn, player, name, source, source_user, source_player,
|
|
||||||
UNIQUE(game, turn, player)
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS games(
|
|
||||||
id, p1, p2, format, uploadtime, winner,
|
|
||||||
UNIQUE(id)
|
|
||||||
);
|
|
||||||
-- No good way to ensure idempotence for damage; just re-build it.
|
|
||||||
DROP TABLE IF EXISTS damage;
|
|
||||||
CREATE TABLE damage(game, player, name, value);
|
|
||||||
DROP TABLE IF EXISTS indirect_damage;
|
|
||||||
CREATE TABLE indirect_damage(game, player, name, value);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_log(game: str, log: str, into: sqlite3.Connection):
|
|
||||||
conn = into
|
|
||||||
|
|
||||||
turn = 0
|
|
||||||
players = {}
|
|
||||||
hp = {}
|
|
||||||
|
|
||||||
# ("p2a: Edward", "p1a: Meteo")
|
|
||||||
# memorises the user of the move that causes environment setting or status,
|
|
||||||
# and its target
|
|
||||||
last_move: t.Optional[tuple[str, str]]
|
|
||||||
|
|
||||||
# ("p1", "Spikes") => "p2a: Frosslas"
|
|
||||||
last_env_set: dict[tuple[str, str], str] = {}
|
|
||||||
|
|
||||||
# ("p1a: Meteo", "brn") => "p2a: Edward"
|
|
||||||
last_status_set: dict[tuple[str, str], str] = {}
|
|
||||||
|
|
||||||
def resolve_mon(user: str) -> tuple[str, str]:
|
|
||||||
[player, name] = user.split(": ")
|
|
||||||
return players[player.strip("ab")], name
|
|
||||||
|
|
||||||
for line in log.split("\n"):
|
|
||||||
chunks = line.split("|")[1:]
|
|
||||||
if not chunks:
|
|
||||||
continue
|
|
||||||
|
|
||||||
LOG.trace(line)
|
|
||||||
|
|
||||||
match chunks:
|
|
||||||
case ["player", id, username, *rest]:
|
|
||||||
players[id] = username
|
|
||||||
|
|
||||||
case ["turn", turn]:
|
|
||||||
turn = int(turn)
|
|
||||||
|
|
||||||
case ["move", user, move, target]:
|
|
||||||
last_move = (user, target)
|
|
||||||
player, user = resolve_mon(user)
|
|
||||||
_, target = resolve_mon(target)
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO moves(game, turn, player, name, user, target)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(game, turn, team(player), move, user, target),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["drag", name, specie, status, *rest]:
|
|
||||||
hp[name] = int(status.split("/")[0])
|
|
||||||
|
|
||||||
case ["switch", name, specie, status, *rest]:
|
|
||||||
hp[name] = int(status.split("/")[0])
|
|
||||||
|
|
||||||
player, name = resolve_mon(name)
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO switches(game, turn, player, name)
|
|
||||||
VALUES (?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(game, turn, team(player), name),
|
|
||||||
)
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO nicknames(game, player, name, specie)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(game, team(player), name, specie.split(", ")[0]),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["faint", mon]:
|
|
||||||
player, mon = resolve_mon(mon)
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO knockouts(game, turn, player, name)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(game, turn, team(player), mon),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["win", player]:
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
UPDATE games
|
|
||||||
SET winner = ?
|
|
||||||
WHERE id = ?
|
|
||||||
""",
|
|
||||||
(team(player), game),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["-sidestart", side, env]:
|
|
||||||
if not last_move:
|
|
||||||
LOG.warning(f"missing previous move for {line}")
|
|
||||||
continue
|
|
||||||
LOG.debug(f"{line} <- {last_move}")
|
|
||||||
last_env_set[(side[0:1], env.replace("move: ", ""))] = last_move[0]
|
|
||||||
|
|
||||||
case ["-status", mon, cond]:
|
|
||||||
if not last_move or last_move[1] != mon:
|
|
||||||
LOG.warning(f"missing previous move for {line}")
|
|
||||||
continue
|
|
||||||
LOG.debug(f"{line} <- {last_move}")
|
|
||||||
last_status_set[(mon, cond)] = last_move[0]
|
|
||||||
|
|
||||||
case ["-damage", mon, status]:
|
|
||||||
# mon takes direct (non-hazard/condition) damage
|
|
||||||
# status can be a percentage 70/100 with or without condition,
|
|
||||||
# or "0 fnt"
|
|
||||||
new_hp = int(re.split("[/ ]", status)[0])
|
|
||||||
LOG.debug(f"{mon} dropped to {new_hp} from {hp[mon]}")
|
|
||||||
LOG.debug(f"source: {last_move}")
|
|
||||||
|
|
||||||
# resolve to damage source
|
|
||||||
if last_move[1] != mon:
|
|
||||||
LOG.warn(
|
|
||||||
f"{mon} took direct damage but last move was not targeted at them"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
user = last_move[0]
|
|
||||||
source_player, source_mon = resolve_mon(user)
|
|
||||||
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO damage(game, player, name, value)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(game, team(source_player), source_mon, hp[mon] - new_hp),
|
|
||||||
)
|
|
||||||
|
|
||||||
hp[mon] = new_hp
|
|
||||||
|
|
||||||
case ["-damage", mon, status, from_]:
|
|
||||||
# mon takes indirect damage
|
|
||||||
# status can be a percentage 70/100 with or without condition,
|
|
||||||
# or "0 fnt"
|
|
||||||
# mon has fainted from an indirect damage source
|
|
||||||
#
|
|
||||||
new_hp = int(re.split("[/ ]", status)[0])
|
|
||||||
LOG.debug(f"{mon} dropped to {new_hp} from {from_}")
|
|
||||||
|
|
||||||
LOG.debug(f"tracing source for {line}")
|
|
||||||
source = from_.replace("[from] ", "")
|
|
||||||
source_user = None
|
|
||||||
|
|
||||||
test_hazard = last_env_set.get((mon[0:1], source))
|
|
||||||
if test_hazard:
|
|
||||||
source_user = test_hazard
|
|
||||||
LOG.debug(f"identified hazard source {source_user}")
|
|
||||||
|
|
||||||
test_status = last_status_set.get((mon, source))
|
|
||||||
if test_status:
|
|
||||||
source_user = test_status
|
|
||||||
LOG.debug(f"identified move source {source_user}")
|
|
||||||
|
|
||||||
if source == "Recoil" or source.startswith("item: "):
|
|
||||||
LOG.debug(f"identified special source {source}")
|
|
||||||
source = source.replace("item: ", "")
|
|
||||||
source_user = "self"
|
|
||||||
|
|
||||||
if not source_user:
|
|
||||||
LOG.error(f"missing source for {line}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
player, pkmn = resolve_mon(mon)
|
|
||||||
if source_user.startswith("p1") or source_user.startswith("p2"):
|
|
||||||
source_player, source_mon = resolve_mon(source_user)
|
|
||||||
else:
|
|
||||||
source_player = None
|
|
||||||
|
|
||||||
if source_player:
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO indirect_damage(game, player, name, value)
|
|
||||||
VALUES(?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
(game, team(source_player), source_mon, hp[mon] - new_hp),
|
|
||||||
)
|
|
||||||
|
|
||||||
if status == "0 fnt":
|
|
||||||
conn.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO indirect_knockouts(game, turn, player, name, source, source_user, source_player)
|
|
||||||
VALUES(?, ?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
game,
|
|
||||||
turn,
|
|
||||||
team(player),
|
|
||||||
pkmn,
|
|
||||||
source,
|
|
||||||
source_mon,
|
|
||||||
team(source_player),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
case ["-heal", mon, status, *rest]:
|
|
||||||
hp[mon] = int(status.split("/")[0])
|
|
||||||
|
|
||||||
case _:
|
|
||||||
# LOG.debug(f"unhandled message {chunks[0]}")
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
|
||||||
class Replay:
|
|
||||||
id: str
|
|
||||||
p1: str
|
|
||||||
p2: str
|
|
||||||
format: str
|
|
||||||
log: str
|
|
||||||
uploadtime: int
|
|
||||||
views: int
|
|
||||||
p1id: str
|
|
||||||
p2id: str
|
|
||||||
formatid: str
|
|
||||||
rating: int
|
|
||||||
private: int
|
|
||||||
password: t.Optional[str]
|
|
||||||
|
|
||||||
|
|
||||||
def fetch(replay: str, cache: bool = True) -> Replay:
|
|
||||||
replay = replay.replace("https://replay.pokemonshowdown.com/", "")
|
|
||||||
replay_file = Path.cwd() / "cache" / f"{replay}.json"
|
|
||||||
|
|
||||||
if cache and replay_file.exists():
|
|
||||||
with replay_file.open() as f:
|
|
||||||
return Replay(**json.load(f))
|
|
||||||
|
|
||||||
data = requests.get(f"https://replay.pokemonshowdown.com/{replay}.json")
|
|
||||||
if data.status_code != 200:
|
|
||||||
raise Exception(data.text)
|
|
||||||
data = data.json()
|
|
||||||
|
|
||||||
if cache:
|
|
||||||
replay_file.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
||||||
with replay_file.open(mode="w") as f:
|
|
||||||
json.dump(data, f)
|
|
||||||
|
|
||||||
return Replay(**data)
|
|
||||||
|
|
||||||
|
|
||||||
def main(args):
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
prog=APP,
|
|
||||||
description="extracts stats from a Showdown replay",
|
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
||||||
)
|
|
||||||
parser.add_argument("-v", "--verbose", action="count", help="add debugging info")
|
|
||||||
parser.add_argument(
|
|
||||||
"-C",
|
|
||||||
"--no-cache",
|
|
||||||
action="store_true",
|
|
||||||
help="fetch replays instead of using cache",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-t",
|
|
||||||
"--teams",
|
|
||||||
action="store",
|
|
||||||
metavar="FILE",
|
|
||||||
default="teams.json",
|
|
||||||
help="JSON file defining players to teams",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-o",
|
|
||||||
"--output",
|
|
||||||
action="store",
|
|
||||||
metavar="FILE",
|
|
||||||
default="data.db",
|
|
||||||
help="output data file",
|
|
||||||
)
|
|
||||||
parser.add_argument("replay", nargs="+", help="replay ID or URL")
|
|
||||||
|
|
||||||
args = parser.parse_args(args)
|
|
||||||
if args.verbose and args.verbose > 1:
|
|
||||||
LOG.setLevel(logging.TRACE)
|
|
||||||
elif args.verbose:
|
|
||||||
LOG.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
if args.teams:
|
|
||||||
with open(args.teams) as f:
|
|
||||||
global TEAMS
|
|
||||||
TEAMS = json.load(f)
|
|
||||||
|
|
||||||
try:
|
|
||||||
db = sqlite3.connect(args.output)
|
|
||||||
_init_db(db)
|
|
||||||
|
|
||||||
for r in args.replay:
|
|
||||||
try:
|
|
||||||
replay = fetch(r, cache=not args.no_cache)
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error(f"bad replay {r}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
LOG.info(f"indexing game {replay.id}")
|
|
||||||
db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO games(id, p1, p2, format, uploadtime)
|
|
||||||
VALUES (?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
replay.id,
|
|
||||||
team(replay.p1),
|
|
||||||
team(replay.p2),
|
|
||||||
replay.format,
|
|
||||||
replay.uploadtime,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
parse_log(replay.id, replay.log, into=db)
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
finally:
|
|
||||||
db.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main(sys.argv[1:])
|
|
Loading…
Reference in New Issue
Block a user