import datetime import logging import os import time import coloredlogs from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.ext.hybrid import hybrid_property # noinspection PyUnresolvedReferences from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, \ UniqueConstraint, PrimaryKeyConstraint, Boolean, LargeBinary, Text, Date, func from sqlalchemy.inspection import inspect import requests from utils.errors import NotFoundError, AlreadyExistingError, PrivateError import re import enum import loldata from utils.dirty import Dirty, DirtyDelta import sql_queries from flask import escape import configparser import typing from utils import MatchmakingStatus, errors import strings if typing.TYPE_CHECKING: # noinspection PyPackageRequirements from discord import User as DiscordUser # noinspection PyPackageRequirements from telegram import User as TelegramUser # Init the config reader config = configparser.ConfigParser() config.read("config.ini") # Init the sqlalchemy engine engine = create_engine(config["Database"]["database_uri"]) Base = declarative_base(bind=engine) Session = sessionmaker(bind=engine) logging.getLogger().disabled = True logger = logging.getLogger(__name__) os.environ["COLOREDLOGS_LOG_FORMAT"] = "%(asctime)s %(levelname)s %(name)s %(message)s" coloredlogs.install(level="DEBUG", logger=logger) def relationship_name_search(_class, keyword) -> typing.Optional[tuple]: """Recursively find a relationship with a given name.""" inspected = set() def search(_mapper, chain): inspected.add(_mapper) relationships = _mapper.relationships try: return chain + (relationships[keyword],) except KeyError: for _relationship in set(relationships): if _relationship.mapper in inspected: continue result = search(_relationship.mapper, chain + (_relationship,)) if result is not None: return result return None return search(inspect(_class), tuple()) def relationship_link_chain(starting_class, ending_class) -> typing.Optional[tuple]: """Find the path to follow to get from the starting table to the ending table.""" inspected = set() def search(_mapper, chain): inspected.add(_mapper) if _mapper.class_ == ending_class: return chain relationships = _mapper.relationships for _relationship in set(relationships): if _relationship.mapper in inspected: continue try: return search(_relationship.mapper, chain + (_relationship,)) except errors.NotFoundError: continue raise errors.NotFoundError() return search(inspect(starting_class), tuple()) class Mini(object): """Mixin for every table that has an associated mini.""" _mini_full_name = NotImplemented _mini_name = NotImplemented _mini_order = NotImplemented @classmethod def mini_get_all(cls, session: Session) -> list: return session.query(cls).order_by(*cls._mini_order).all() @classmethod def mini_get_single(cls, session: Session, **kwargs): return session.query(cls).filter_by(**kwargs).one_or_none() @classmethod def mini_get_single_from_royal(cls, session: Session, royal: "Royal"): chain = relationship_link_chain(cls, Royal) if chain is None: chain = tuple() start = session.query(cls) for connection in chain: start = start.join(connection.mapper.class_) start = start.filter(Royal.id == royal.id) mini = start.one() return mini class Royal(Base, Mini): __tablename__ = "royals" id = Column(Integer, primary_key=True) username = Column(String, unique=True, nullable=False) password = Column(LargeBinary) role = Column(String) fiorygi = Column(Integer, default=0) member_since = Column(Date) special_title = Column(String) _mini_full_name = "Royalnet" _mini_name = "ryg" _mini_order = [fiorygi.desc()] @staticmethod def create(session: Session, username: str): r = session.query(Royal).filter_by(username=username).first() if r is not None: raise AlreadyExistingError(repr(r)) return Royal(username=username) def __repr__(self): return f"" @classmethod def mini_get_single_from_royal(cls, session: Session, royal: "Royal"): return royal class Telegram(Base, Mini): __tablename__ = "telegram" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="telegram", lazy="joined") telegram_id = Column(BigInteger, primary_key=True) first_name = Column(String) last_name = Column(String) username = Column(String) _mini_full_name = "Telegram" _mini_name = "tg" _mini_order = [telegram_id] @staticmethod def create(session: Session, royal_username, telegram_user: "TelegramUser"): t = session.query(Telegram).filter_by(telegram_id=telegram_user.id).first() if t is not None: raise AlreadyExistingError(repr(t)) r = session.query(Royal).filter(Royal.username == royal_username).first() if r is None: raise NotFoundError("No Royal exists with that username") t = session.query(Telegram).filter(Telegram.royal_id == r.id).first() if t is not None: raise AlreadyExistingError(repr(t)) return Telegram(royal=r, telegram_id=telegram_user.id, first_name=telegram_user.first_name, last_name=telegram_user.last_name, username=telegram_user.username) def __repr__(self): return f"" def mention(self): if self.username is not None: return f"@{self.username}" else: return self.first_name def __str__(self): if self.username is not None: return self.username elif self.last_name is not None: return f"{self.first_name} {self.last_name}" else: return self.first_name @classmethod def mini_get_single_from_royal(cls, session: Session, royal: "Royal"): return royal.telegram class Steam(Base, Mini): __tablename__ = "steam" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="steam", lazy="joined") steam_id = Column(String, primary_key=True) persona_name = Column(String) avatar_hex = Column(String) trade_token = Column(String) most_played_game_id = Column(BigInteger) _mini_full_name = "Steam" _mini_name = "steam" _mini_order = [steam_id] def __repr__(self): if not self.persona_name: return f"" return f"" def __str__(self): if self.persona_name is not None: return self.persona_name else: return self.steam_id def most_played_game_url(self): return f"https://steamcdn-a.akamaihd.net/steam/apps/{self.most_played_game_id}/header.jpg" def avatar_url(self): return f"https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{self.avatar_hex[0:2]}/"\ f"{self.avatar_hex}.jpg" @staticmethod def create(session: Session, royal_id: int, steam_id: str): s = session.query(Steam).get(steam_id) if s is not None: raise AlreadyExistingError(repr(s)) r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/" f"?key={config['Steam']['api_key']}&steamids={steam_id}") r.raise_for_status() j = r.json() if len(j) == 0: raise NotFoundError(f"The steam_id doesn't match any steam account") s = Steam(royal_id=royal_id, steam_id=steam_id, persona_name=j["response"]["players"][0]["personaname"], avatar_hex=re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../" r"(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)) return s @staticmethod def find_trade_token(trade_url): return re.search(r"https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url)\ .group(1) @staticmethod def to_steam_id_2(steam_id): # Got this code from a random github gist. It could be completely wrong. z = (int(steam_id) - 76561197960265728) // 2 y = int(steam_id) % 2 return f"STEAM_0:{y}:{z}" @staticmethod def to_steam_id_3(steam_id, full=False): # Got this code from a random github gist. It could be completely wrong. if full: return f"[U:1:{int(steam_id) - 76561197960265728}]" else: return f"{int(steam_id) - 76561197960265728}" # noinspection PyUnusedLocal def update(self, session=None, raise_if_private: bool=False): r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/" f"?key={config['Steam']['api_key']}&steamids={self.steam_id}") r.raise_for_status() j = r.json() self.persona_name = j["response"]["players"][0]["personaname"] self.avatar_hex = re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../" r"(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) r = requests.get(f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/" f"?key={config['Steam']['api_key']}&steamid={self.steam_id}&format=json") r.raise_for_status() j = r.json() if "response" not in j or "games" not in j["response"] or len(j["response"]["games"]) < 1: if raise_if_private: raise PrivateError(f"Game data is private") return self.most_played_game_id = j["response"]["games"][0]["appid"] @classmethod def mini_get_single_from_royal(cls, session: Session, royal: "Royal"): return royal.steam class RocketLeague(Base, Mini): __tablename__ = "rocketleague" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="rl", lazy="joined") season = Column(Integer) single_rank = Column(Integer) single_div = Column(Integer) single_mmr = Column(Integer) doubles_rank = Column(Integer) doubles_div = Column(Integer) doubles_mmr = Column(Integer) standard_rank = Column(Integer) standard_div = Column(Integer) standard_mmr = Column(Integer) solo_std_rank = Column(Integer) solo_std_div = Column(Integer) solo_std_mmr = Column(Integer) wins = Column(Integer) _mini_full_name = "Rocket League" _mini_name = "rl" _mini_order = [solo_std_mmr.desc().nullslast(), doubles_mmr.desc().nullslast(), standard_mmr.desc().nullslast(), single_mmr.desc().nullslast()] def __repr__(self): return f"" def update(self, session=None, data=None): raise NotImplementedError("rlstats API is no longer available.") def solo_rank_image(self): if self.single_rank is None: rank = 0 else: rank = self.single_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def doubles_rank_image(self): if self.doubles_rank is None: rank = 0 else: rank = self.doubles_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def standard_rank_image(self): if self.standard_rank is None: rank = 0 else: rank = self.standard_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def solo_std_rank_image(self): if self.solo_std_rank is None: rank = 0 else: rank = self.solo_std_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" class Dota(Base, Mini): __tablename__ = "dota" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="dota", lazy="joined") rank_tier = Column(Integer) wins = Column(Integer) losses = Column(Integer) most_played_hero = Column(Integer) _mini_full_name = "DOTA 2" _mini_name = "dota" _mini_order = [rank_tier.desc().nullslast(), wins.desc().nullslast()] def __repr__(self): return f"" def get_rank_icon_url(self): # Rank icon is determined by the first digit of the rank tier if self.rank_tier is None: return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_icon_0.png" return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_icon_{str(self.rank_tier)[0]}.png" def get_rank_stars_url(self): # Rank stars are determined by the second digit of the rank tier if self.rank_tier is None or str(self.rank_tier)[1] == "0": return "" return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_star_{str(self.rank_tier)[1]}.png" def get_rank_name(self): # This should probably be an enum, but who cares if self.rank_tier is None or self.rank_tier < 10: return "Unranked" number = str(self.rank_tier)[0] if number == "1": return "Harald" elif number == "2": return "Guardian" elif number == "3": return "Crusader" elif number == "4": return "Archon" elif number == "5": return "Legend" elif number == "6": return "Ancient" elif number == "7": return "Divine" def get_rank_number(self): if self.rank_tier is None or self.rank_tier < 10: return "" return str(self.rank_tier)[1] @staticmethod def create(session: Session, steam_id: int) -> "Dota": d = session.query(Dota).get(steam_id) if d is not None: raise AlreadyExistingError(repr(d)) r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}") r.raise_for_status() data = r.json() if "profile" not in data: raise NotFoundError("The specified user has never played Dota or has a private match history") new_record = Dota(steam_id=str(steam_id)) new_record.update() return new_record # noinspection PyUnusedLocal def update(self, session=None) -> bool: r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}") r.raise_for_status() data = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/wl") r.raise_for_status() wl = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/heroes") r.raise_for_status() heroes = r.json() changed = self.rank_tier != data["rank_tier"] self.rank_tier = data["rank_tier"] self.wins = wl["win"] self.losses = wl["lose"] self.most_played_hero = heroes[0]["hero_id"] return changed class LeagueOfLegendsRanks(enum.Enum): IRON = 0 BRONZE = 1 SILVER = 2 GOLD = 3 PLATINUM = 4 DIAMOND = 5 MASTER = 6 GRANDMASTER = 7 CHALLENGER = 8 def __str__(self): return self.name.capitalize() class RomanNumerals(enum.Enum): I = 1 II = 2 III = 3 IV = 4 def __str__(self): return self.name class LeagueOfLegends(Base, Mini): __tablename__ = "leagueoflegends" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="lol", lazy="joined") icon_id = Column(Integer) summoner_id = Column(String, primary_key=True) account_id = Column(String) summoner_name = Column(String) level = Column(Integer) solo_division = Column(Enum(LeagueOfLegendsRanks)) solo_rank = Column(Enum(RomanNumerals)) flex_division = Column(Enum(LeagueOfLegendsRanks)) flex_rank = Column(Enum(RomanNumerals)) twtr_division = Column(Enum(LeagueOfLegendsRanks)) twtr_rank = Column(Enum(RomanNumerals)) highest_mastery_champ = Column(Integer) _mini_full_name = "League of Legends" _mini_name = "lol" _mini_order = [solo_division.desc().nullslast(), solo_rank.desc().nullslast(), flex_division.desc().nullslast(), flex_rank.desc().nullslast(), twtr_division.desc().nullslast(), twtr_rank.desc().nullslast()] def __repr__(self): if not self.summoner_name: return f"" return f"" @staticmethod def create(royal_id, summoner_name) -> "LeagueOfLegends": r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/by-name/{summoner_name}" f"?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() data = r.json() lol = LeagueOfLegends() lol.royal_id = royal_id lol.summoner_name = summoner_name lol.summoner_id = data["id"] lol.account_id = data["accountId"] lol.icon_id = data["profileIconId"] lol.level = data["summonerLevel"] lol.update() return lol # noinspection PyUnusedLocal def update(self, session=None): r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/" f"{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() data = r.json() r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v4/positions/by-summoner/" f"{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() rank = r.json() r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v4/champion-masteries/by-summoner/" f"{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() mastery = r.json() solo_q = None flex_q = None twtr_q = None for league in rank: if league["queueType"] == "RANKED_SOLO_5x5": solo_q = league elif league["queueType"] == "RANKED_FLEX_SR": flex_q = league elif league["queueType"] == "RANKED_FLEX_TT": twtr_q = league self.summoner_id = data["id"] self.summoner_name = data["name"] self.account_id = data["accountId"] self.level = data["summonerLevel"] solo = Dirty((self.solo_division, self.solo_rank)) flex = Dirty((self.flex_division, self.flex_rank)) twtr = Dirty((self.twtr_division, self.twtr_rank)) solo.value = (None, None) if solo_q is None else (LeagueOfLegendsRanks[solo_q["tier"]], RomanNumerals[solo_q["rank"]]) flex.value = (None, None) if flex_q is None else (LeagueOfLegendsRanks[flex_q["tier"]], RomanNumerals[flex_q["rank"]]) twtr.value = (None, None) if twtr_q is None else (LeagueOfLegendsRanks[twtr_q["tier"]], RomanNumerals[twtr_q["rank"]]) self.highest_mastery_champ = mastery[0]["championId"] self.solo_division = solo.value[0] self.solo_rank = solo.value[1] self.flex_division = flex.value[0] self.flex_rank = flex.value[1] self.twtr_division = twtr.value[0] self.twtr_rank = twtr.value[1] return solo, flex, twtr def highest_mastery_champ_name(self): champ = loldata.get_champ_by_key(self.highest_mastery_champ) return champ["id"] def highest_mastery_champ_image(self): champ = loldata.get_champ_by_key(self.highest_mastery_champ) return loldata.get_champ_icon(champ["id"]) class Osu(Base, Mini): __tablename__ = "osu" royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False) royal = relationship("Royal", backref="osu", lazy="joined") osu_id = Column(Integer, primary_key=True) osu_name = Column(String) std_pp = Column(Float, default=0) std_best_song = Column(BigInteger) taiko_pp = Column(Float, default=0) taiko_best_song = Column(BigInteger) catch_pp = Column(Float, default=0) catch_best_song = Column(BigInteger) mania_pp = Column(Float, default=0) mania_best_song = Column(BigInteger) _mini_full_name = "osu!" _mini_name = "osu" _mini_order = [mania_pp.desc().nullslast(), std_pp.desc().nullslast(), taiko_pp.desc().nullslast(), catch_pp.desc().nullslast()] @staticmethod def create(session: Session, royal_id, osu_name): o = session.query(Osu).filter(Osu.osu_name == osu_name).first() if o is not None: raise AlreadyExistingError(repr(o)) r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0") r0.raise_for_status() r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1") r1.raise_for_status() r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2") r2.raise_for_status() r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3") r3.raise_for_status() j0 = r0.json()[0] j1 = r1.json()[0] j2 = r2.json()[0] j3 = r3.json()[0] new_record = Osu(royal_id=royal_id, osu_id=j0["user_id"], osu_name=j0["username"], std_pp=j0["pp_raw"], taiko_pp=j1["pp_raw"], catch_pp=j2["pp_raw"], mania_pp=j3["pp_raw"]) return new_record # noinspection PyUnusedLocal def update(self, session=None): std = DirtyDelta(self.std_pp) taiko = DirtyDelta(self.taiko_pp) catch = DirtyDelta(self.catch_pp) mania = DirtyDelta(self.mania_pp) r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=0") r0.raise_for_status() j0 = r0.json()[0] r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=1") r1.raise_for_status() j1 = r1.json()[0] r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=2") r2.raise_for_status() j2 = r2.json()[0] r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=3") r3.raise_for_status() j3 = r3.json()[0] self.osu_name = j0["username"] std.value = float(j0["pp_raw"] or 0) taiko.value = float(j1["pp_raw"] or 0) catch.value = float(j2["pp_raw"] or 0) mania.value = float(j3["pp_raw"] or 0) self.std_pp = std.value self.taiko_pp = taiko.value self.catch_pp = catch.value self.mania_pp = mania.value return std, taiko, catch, mania def __repr__(self): if not self.osu_name: return f"" return f"" class Discord(Base, Mini): __tablename__ = "discord" __table_args__ = tuple(UniqueConstraint("name", "discriminator")) royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="discord", lazy="joined") discord_id = Column(BigInteger, primary_key=True) name = Column(String) discriminator = Column(Integer) avatar_hex = Column(String) _mini_full_name = "Discord" _mini_name = "discord" _mini_order = [discord_id] def __str__(self): return f"{self.name}#{self.discriminator}" def __repr__(self): return f"" @staticmethod def create(session: Session, royal_username, discord_user: "DiscordUser"): d = session.query(Discord).filter(Discord.discord_id == discord_user.id).first() if d is not None: raise AlreadyExistingError(repr(d)) r = session.query(Royal).filter(Royal.username == royal_username).first() if r is None: raise NotFoundError("No Royal exists with that username") d = session.query(Discord).filter(Discord.royal_id == r.id).first() if d is not None: raise AlreadyExistingError(repr(d)) d = Discord(royal=r, discord_id=discord_user.id, name=discord_user.name, discriminator=discord_user.discriminator, avatar_hex=discord_user.avatar) return d def mention(self): return f"<@{self.discord_id}>" def avatar_url(self, size=256): if self.avatar_hex is None: return "https://discordapp.com/assets/6debd47ed13483642cf09e832ed0bc1b.png" return f"https://cdn.discordapp.com/avatars/{self.discord_id}/{self.avatar_hex}.png?size={size}" @classmethod def mini_get_all(cls, session: Session): return [dict(row) for row in session.execute(sql_queries.all_music_query)] @classmethod def mini_get_single(cls, session: Session, **kwargs): return session.execute(sql_queries.one_music_query, {"royal": kwargs["royal"].id}).fetchone() @classmethod def mini_get_single_from_royal(cls, session: Session, royal: "Royal"): return cls.mini_get_single(session, royal=royal) class Overwatch(Base, Mini): __tablename__ = "overwatch" royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False) royal = relationship("Royal", backref="overwatch", lazy="joined") battletag = Column(String, primary_key=True) discriminator = Column(Integer, primary_key=True) icon = Column(String) level = Column(Integer) rank = Column(Integer) _mini_full_name = "Overwatch" _mini_name = "ow" _mini_order = [rank.desc().nullslast(), level.desc()] def __str__(self, separator="#"): return f"{self.battletag}{separator}{self.discriminator}" def __repr__(self): return f"" @staticmethod def create(session: Session, royal_id, battletag, discriminator=None): raise NotImplementedError() def icon_url(self): return f"https://d1u1mce87gyfbn.cloudfront.net/game/unlocks/{self.icon}.png" # noinspection PyUnusedLocal def update(self, session=None): raise NotImplementedError() def rank_url(self): if self.rank < 1500: n = 1 elif self.rank < 2000: n = 2 elif self.rank < 2500: n = 3 elif self.rank < 3000: n = 4 elif self.rank < 3500: n = 5 elif self.rank < 4000: n = 6 else: n = 7 return f"https://d1u1mce87gyfbn.cloudfront.net/game/rank-icons/season-2/rank-{n}.png" def rank_name(self): if self.rank < 1500: return "Bronze" elif self.rank < 2000: return "Silver" elif self.rank < 2500: return "Gold" elif self.rank < 3000: return "Platinum" elif self.rank < 3500: return "Diamond" elif self.rank < 4000: return "Master" else: return "Grandmaster" class Diario(Base): __tablename__ = "diario" id = Column(Integer, primary_key=True) timestamp = Column(DateTime, nullable=False) saver_id = Column(Integer, ForeignKey("telegram.telegram_id")) saver = relationship("Telegram", foreign_keys=saver_id, backref="diario_saves", lazy="joined") author_id = Column(Integer, ForeignKey("telegram.telegram_id")) author = relationship("Telegram", foreign_keys=author_id, backref="diario_authored", lazy="joined") spoiler = Column(Boolean, default=False) text = Column(String) def __repr__(self): return f"" def __str__(self): return f"{self.id} - {self.timestamp} - {self.author}: {self.text}" def to_telegram(self): return '#{id} di {author}\n{text}'.format( id=self.id, author=f"{self.author}" if self.author is not None else strings.DIARIO.ANONYMOUS, text=escape(self.text)) def to_html(self): return str(escape(self.text)).replace("\n", "
") class BaluRage(Base): __tablename__ = "balurage" id = Column(Integer, primary_key=True) royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="times_raged", lazy="joined") reason = Column(String) def __repr__(self): return f"" class PlayedMusic(Base): __tablename__ = "playedmusic" id = Column(Integer, primary_key=True) enqueuer_id = Column(BigInteger, ForeignKey("discord.discord_id")) enqueuer = relationship("Discord", backref="music_played", lazy="joined") filename = Column(String) timestamp = Column(DateTime, nullable=False) def __repr__(self): return f"" class VoteQuestion(Base): __tablename__ = "votequestion" id = Column(Integer, primary_key=True) message_id = Column(BigInteger) question = Column(String, nullable=False) anonymous = Column(Boolean, nullable=False) open = Column(Boolean, default=True) def __repr__(self): return f"" def generate_text(self, session: Session): text = f"{self.question}\n\n" none, yes, no, abstain = 0, 0, 0, 0 if self.message_id is not None: query = session.execute(sql_queries.vote_answers, {"message_id": self.message_id}) for record in query: if record["username"] == "royalgamesbot": continue elif record["question_id"] is None: text += "⚪️" none += 1 elif record["choice"] == "YES": text += "🔵" yes += 1 elif record["choice"] == "NO": text += "🔴" no += 1 elif record["choice"] == "ABSTAIN": text += "⚫️" abstain += 1 if not self.anonymous: text += f" {str(record['username'])}\n" if self.anonymous: text += "\n" text += f"\n" \ f"⚪ {none}\n" \ f"🔵 {yes}\n" \ f"🔴 {no}\n" \ f"⚫️ {abstain}" return text class VoteChoices(enum.Enum): ABSTAIN = 1 YES = 2 NO = 3 class VoteAnswer(Base): __tablename__ = "voteanswer" question_id = Column(Integer, ForeignKey("votequestion.id")) question = relationship("VoteQuestion", backref="answers", lazy="joined") user_id = Column(BigInteger, ForeignKey("telegram.telegram_id")) user = relationship("Telegram", backref="votes_cast", lazy="joined") choice = Column(Enum(VoteChoices), nullable=False) __table_args__ = (PrimaryKeyConstraint("question_id", "user_id"),) def __repr__(self): return f"" class ProfileData(Base): __tablename__ = "profiledata" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="profile_data", uselist=False, lazy="joined") css = Column(Text) bio = Column(Text) def __repr__(self): return f"" class WikiEntry(Base): __tablename__ = "wikientries" key = Column(String, primary_key=True) content = Column(Text, nullable=False) locked = Column(Boolean, default=False) def __repr__(self): return f"" class WikiLog(Base): __tablename__ = "wikilog" edit_id = Column(Integer, primary_key=True) editor_id = Column(Integer, ForeignKey("royals.id"), nullable=False) editor = relationship("Royal", backref="wiki_edits", lazy="joined") edited_key = Column(String, ForeignKey("wikientries.key"), nullable=False) edited = relationship("WikiEntry", backref="edit_logs", lazy="joined") timestamp = Column(DateTime, nullable=False) reason = Column(Text) def __repr__(self): return f"" class Event(Base): __tablename__ = "events" id = Column(Integer, primary_key=True) author_id = Column(Integer, ForeignKey("royals.id"), nullable=False) author = relationship("Royal", lazy="joined") name = Column(String, nullable=False) description = Column(Text) time = Column(DateTime, nullable=False) @hybrid_property def time_left(self) -> datetime.timedelta: return self.time - datetime.datetime.now() @time_left.setter def time_left(self, value): if not isinstance(value, datetime.timedelta): raise TypeError("time_left should be a datetime.timedelta") self.time = datetime.datetime.now() + value def __repr__(self): return f"" class Reddit(Base): __tablename__ = "reddit" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="reddit", lazy="joined") username = Column(String, primary_key=True) karma = Column(BigInteger) def __repr__(self): return f"" class ParsedRedditPost(Base): __tablename__ = "parsedredditposts" id = Column(String, primary_key=True) author_username = Column(String) def __repr__(self): return f"" class LoginToken(Base): __tablename__ = "logintoken" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="tokens", lazy="joined") token = Column(String, primary_key=True) expiration = Column(DateTime, nullable=False) def __repr__(self): return f"" class Halloween(Base, Mini): """This is some nice spaghetti, don't you think?""" __tablename__ = "halloween" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="halloween", lazy="joined") first_trigger = Column(DateTime) puzzle_piece_a = Column(DateTime) puzzle_piece_b = Column(DateTime) puzzle_piece_c = Column(DateTime) puzzle_piece_d = Column(DateTime) puzzle_piece_e = Column(DateTime) puzzle_piece_f = Column(DateTime) puzzle_piece_g = Column(DateTime) boss_battle = Column(DateTime) _mini_full_name = "Halloween 2018" _mini_name = "halloween2018" _mini_order = [first_trigger] def __getitem__(self, item): if not isinstance(item, int): raise TypeError("The index should be an int") if item == 1: return self.puzzle_piece_a elif item == 2: return self.puzzle_piece_b elif item == 3: return self.puzzle_piece_c elif item == 4: return self.puzzle_piece_d elif item == 5: return self.puzzle_piece_e elif item == 6: return self.puzzle_piece_f elif item == 7: return self.puzzle_piece_g else: raise ValueError("No such puzzle piece") def __setitem__(self, key, value): if not isinstance(key, int): raise TypeError("The index should be an int") if key == 1: self.puzzle_piece_a = value elif key == 2: self.puzzle_piece_b = value elif key == 3: self.puzzle_piece_c = value elif key == 4: self.puzzle_piece_d = value elif key == 5: self.puzzle_piece_e = value elif key == 6: self.puzzle_piece_f = value elif key == 7: self.puzzle_piece_g = value else: raise ValueError("No such puzzle piece") def pieces_completed(self) -> int: count = 0 for i in range(1, 8): if self[i]: count += 1 return count @staticmethod def puzzle_status() -> typing.Tuple[bool, typing.List[bool]]: session = Session() halloweens = session.query(Halloween).all() session.close() completed = [False for _ in range(7)] started = False for h in halloweens: if h.royal.role == "Affiliato": continue if h.royal.username == "Steffo": continue for i in range(7): if h.first_trigger is not None: started = True if h[i+1]: completed[i] = True return started, completed class ActivityReport(Base): __tablename__ = "activityreports" timestamp = Column(DateTime, primary_key=True) discord_members_online = Column(Integer) discord_members_ingame = Column(Integer) discord_cv = Column(Integer) discord_members_cv = Column(Integer) discord_channels_used = Column(Integer) def __repr__(self): return f"" class Quest(Base): __tablename__ = "quests" id = Column(Integer, primary_key=True) title = Column(String) description = Column(Text) reward = Column(Integer) expiration_date = Column(DateTime) def __repr__(self): return f"" class Terraria13(Base, Mini): __tablename__ = "terraria13" game_name = "Terraria 13" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="terraria13", lazy="joined") character_name = Column(String) contribution = Column(Integer) _mini_full_name = "Terraria 13" _mini_name = "terraria13" _mini_order = [contribution.desc()] def __repr__(self): return f"" class Minecraft2019(Base, Mini): __tablename__ = "minecraft2019" game_name = "Minecraft 2019" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="minecraft2019", lazy="joined") character_name = Column(String) contribution = Column(Integer) _mini_full_name = "Minecraft 2019" _mini_name = "minecraft2019" _mini_order = [contribution.desc()] def __repr__(self): return f"" mini_list = [Royal, Telegram, Steam, Dota, LeagueOfLegends, Osu, Discord, Overwatch, Halloween, Terraria13] class Match(Base): __tablename__ = "matches" id = Column(Integer, primary_key=True) timestamp = Column(DateTime) creator_id = Column(BigInteger, ForeignKey("telegram.telegram_id")) creator = relationship("Telegram", backref="matches_created", lazy="joined") match_title = Column(String) match_desc = Column(Text) min_players = Column(Integer) max_players = Column(Integer) closed = Column(Boolean, default=False) message_id = Column(BigInteger) def active_players_count(self): count = 0 for player in self.players: if player.status == MatchmakingStatus.READY \ or player.status == MatchmakingStatus.WAIT_FOR_ME \ or player.status == MatchmakingStatus.SOMEONE_ELSE: count += 1 return count def generate_text(self, session): player_list = session.query(MatchPartecipation).filter_by(match=self).all() title = f"{self.match_title}" description = f"{self.match_desc}\n" if self.match_desc else "" if self.min_players: minimum = f" (minimo {self.min_players})" else: minimum = "" plist = f"Giocatori{minimum}:\n" ignore_count = 0 for player in player_list: icon = strings.MATCHMAKING.ENUM_TO_EMOJIS[player.status] if player.status == MatchmakingStatus.IGNORED: ignore_count += 1 continue plist += f"{icon} {player.user.royal.username}\n" if ignore_count: ignored = f"❌ {ignore_count} persone non sono interessate.\n" else: ignored = "" if self.max_players: players = f"[{self.active_players_count()}/{self.max_players}]" else: players = f"[{self.active_players_count()}]" close = f"[matchmaking terminato]\n" if self.closed else "" message = f"{title} {players}\n" \ f"{description}\n" \ f"{plist}\n" \ f"{ignored}" \ f"{close}" return message def __repr__(self): return f"" def format_dict(self) -> typing.Dict[str, str]: return { "id": str(self.id), "timestamp": self.timestamp.isoformat(), "creator_id": str(self.creator_id), "creator_name": self.creator.mention(), "match_title": self.match_title, "match_desc": self.match_desc if self.match_desc is not None else "", "min_players": str(self.min_players) if self.min_players is not None else "", "max_players": str(self.max_players) if self.max_players is not None else "", "active_players": str(self.active_players_count()), "players": str(len(self.players)) } class MatchPartecipation(Base): __tablename__ = "matchpartecipations" __table_args__ = (PrimaryKeyConstraint("user_id", "match_id"),) user_id = Column(BigInteger, ForeignKey("telegram.telegram_id")) user = relationship("Telegram", backref="match_partecipations", lazy="joined") match_id = Column(Integer, ForeignKey("matches.id")) match = relationship("Match", backref="players", lazy="joined") status = Column(Integer) def __repr__(self): return f"" class BindingOfIsaac(Base, Mini): __tablename__ = "bindingofisaac" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="binding_of_isaac", lazy="joined") daily_victories = Column(Integer, default=0) def __repr__(self): return f"" def recalc_victories(self): raise NotImplementedError() # TODO class BindingOfIsaacRun(Base): __tablename__ = "bindingofisaacruns" __table_args__ = (PrimaryKeyConstraint("date", "player_id"),) date = Column(Date) player_id = Column(String, ForeignKey("bindingofisaac.steam_id")) player = relationship("BindingOfIsaac", backref="runs", lazy="joined") score = Column(BigInteger) # time = Column(???) def __repr__(self): return f"" class Brawlhalla(Base, Mini): __tablename__ = "brawlhalla" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="brawlhalla", lazy="joined") brawlhalla_id = Column(BigInteger) name = Column(String) level = Column(Integer) main_legend_name = Column(String) main_legend_level = Column(Integer) ranked_plays = Column(Integer) ranked_wins = Column(Integer) rating = Column(Integer) best_team_partner_id = Column(BigInteger) best_team_rating = Column(Integer) _mini_full_name = "Brawlhalla" _mini_name = "brawlhalla" _mini_order = [rating.desc(), level.desc()] def __repr__(self): return f"" @hybrid_property def best_team_partner(self) -> typing.Optional["Brawlhalla"]: # FIXME: dirty hack here session = Session() b = session.query(Brawlhalla).filter_by(brawlhalla_id=self.best_team_partner_id).one_or_none() session.close() return b @staticmethod def init_table(): session = Session() steam = session.query(Steam).all() for user in steam: j = requests.get("https://api.brawlhalla.com/search", params={ "steamid": user.steam_id, "api_key": config["Brawlhalla"]["brawlhalla_api_key"] }).json() if not j: continue b = session.query(Brawlhalla).filter_by(steam=user).one_or_none() if b is None: b = Brawlhalla(steam_id=user.steam_id, brawlhalla_id=j["brawlhalla_id"], name=j["name"]) session.add(b) time.sleep(1) session.commit() def update(self, session=None): j = requests.get(f"https://api.brawlhalla.com/player/{self.brawlhalla_id}/stats?api_key={config['Brawlhalla']['brawlhalla_api_key']}").json() self.name = j.get("name", "unknown") self.level = j.get("level", 0) try: main_legend = max(j.get("legends", []), key=lambda l: l.get("level", 0)) self.main_legend_level = main_legend.get("level", 0) self.main_legend_name = main_legend.get("legend_name_key", "unknown") except ValueError: pass j = requests.get(f"https://api.brawlhalla.com/player/{self.brawlhalla_id}/ranked?api_key={config['Brawlhalla']['brawlhalla_api_key']}").json() self.ranked_plays = j.get("games", 0) self.ranked_wins = j.get("wins", 0) rating = DirtyDelta(self.rating) rating.value = j.get("rating") self.rating = rating.value best_team_data = Dirty((self.best_team_partner_id, self.best_team_rating)) try: current_best_team = max(j.get("2v2", []), key=lambda t: t.get("rating", 0)) if current_best_team["brawlhalla_id_one"] == self.brawlhalla_id: self.best_team_partner_id = current_best_team["brawlhalla_id_two"] else: self.best_team_partner_id = current_best_team["brawlhalla_id_one"] self.best_team_rating = current_best_team["rating"] best_team_data.value = (self.best_team_partner_id, self.best_team_rating) except ValueError: pass return rating, best_team_data # If run as script, create all the tables in the db if __name__ == "__main__": print("Creating new tables...") Base.metadata.create_all(bind=engine) print("Done!")