import datetime import logging import os import typing import coloredlogs from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint, PrimaryKeyConstraint, Boolean, or_, LargeBinary, Text, Date, func, desc import requests from errors import NotFoundError, AlreadyExistingError, PrivateError import re import enum from discord import User as DiscordUser from telegram import User as TelegramUser import loldata from dirty import Dirty # Init the config reader import configparser config = configparser.ConfigParser() config.read("config.ini") # Init the sqlalchemy engine engine = create_engine(config["Database"]["database_uri"]) Base = declarative_base(bind=engine) Session = sessionmaker(bind=engine) logging.getLogger().disabled = True logger = logging.getLogger(__name__) os.environ["COLOREDLOGS_LOG_FORMAT"] = "%(asctime)s %(levelname)s %(name)s %(message)s" coloredlogs.install(level="DEBUG", logger=logger) class Royal(Base): __tablename__ = "royals" id = Column(Integer, primary_key=True) username = Column(String, unique=True, nullable=False) password = Column(LargeBinary) role = Column(String) fiorygi = Column(Integer, default=0) member_since = Column(Date) @staticmethod def create(session: Session, username: str): r = session.query(Royal).filter_by(username=username).first() if r is not None: raise AlreadyExistingError(repr(r)) return Royal(username=username) def __repr__(self): return f"" class Telegram(Base): __tablename__ = "telegram" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="telegram", lazy="joined") telegram_id = Column(BigInteger, primary_key=True) first_name = Column(String) last_name = Column(String) username = Column(String) @staticmethod def create(session: Session, royal_username, telegram_user: TelegramUser): t = session.query(Telegram).filter_by(telegram_id=telegram_user.id).first() if t is not None: raise AlreadyExistingError(repr(t)) r = session.query(Royal).filter(Royal.username == royal_username).first() if r is None: raise NotFoundError("No Royal exists with that username") t = session.query(Telegram).filter(Telegram.royal_id == r.id).first() if t is not None: raise AlreadyExistingError(repr(t)) return Telegram(royal=r, telegram_id=telegram_user.id, first_name=telegram_user.first_name, last_name=telegram_user.last_name, username=telegram_user.username) def __repr__(self): return f"" def mention(self): if self.username is not None: return f"@{self.username}" else: return self.first_name def __str__(self): if self.username is not None: return self.username elif self.last_name is not None: return f"{self.first_name} {self.last_name}" else: return self.first_name class Steam(Base): __tablename__ = "steam" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="steam", lazy="joined") steam_id = Column(String, primary_key=True) persona_name = Column(String) avatar_hex = Column(String) trade_token = Column(String) most_played_game_id = Column(BigInteger) def __repr__(self): if not self.persona_name: return f"" return f"" def __str__(self): if self.persona_name is not None: return self.persona_name else: return self.steam_id def most_played_game_url(self): return f"https://steamcdn-a.akamaihd.net/steam/apps/{self.most_played_game_id}/header.jpg" def avatar_url(self): return f"https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{self.avatar_hex[0:2]}/{self.avatar_hex}.jpg" @staticmethod def create(session: Session, royal_id: int, steam_id: str): s = session.query(Steam).get(steam_id) if s is not None: raise AlreadyExistingError(repr(s)) r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}") r.raise_for_status() j = r.json() if len(j) == 0: raise NotFoundError(f"The steam_id doesn't match any steam account") s = Steam(royal_id=royal_id, steam_id=steam_id, persona_name=j["response"]["players"][0]["personaname"], avatar_hex=re.search(r"https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)) return s @staticmethod def find_trade_token(trade_url): return re.search(r"https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url).group(1) @staticmethod def to_steam_id_2(steam_id): # Got this code from a random github gist. It could be completely wrong. z = (int(steam_id) - 76561197960265728) // 2 y = int(steam_id) % 2 return f"STEAM_0:{y}:{z}" @staticmethod def to_steam_id_3(steam_id, full=False): # Got this code from a random github gist. It could be completely wrong. if full: return f"[U:1:{int(steam_id) - 76561197960265728}]" else: return f"{int(steam_id) - 76561197960265728}" def update(self, session=None, raise_if_private: bool=False): r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}") r.raise_for_status() j = r.json() self.persona_name = j["response"]["players"][0]["personaname"] self.avatar_hex = re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) r = requests.get(f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/?key={config['Steam']['api_key']}&steamid={self.steam_id}&format=json") r.raise_for_status() j = r.json() if "response" not in j or "games" not in j["response"] or len(j["response"]["games"]) < 1: if raise_if_private: raise PrivateError(f"Game data is private") return self.most_played_game_id = j["response"]["games"][0]["appid"] class RocketLeague(Base): __tablename__ = "rocketleague" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="rl", lazy="joined") season = Column(Integer) single_rank = Column(Integer) single_div = Column(Integer) single_mmr = Column(Integer) doubles_rank = Column(Integer) doubles_div = Column(Integer) doubles_mmr = Column(Integer) standard_rank = Column(Integer) standard_div = Column(Integer) standard_mmr = Column(Integer) solo_std_rank = Column(Integer) solo_std_div = Column(Integer) solo_std_mmr = Column(Integer) wins = Column(Integer) def __repr__(self): return f"" def update(self, session=None, data=None): raise NotImplementedError("rlstats API is no longer available.") def solo_rank_image(self): if self.single_rank is None: rank = 0 else: rank = self.single_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def doubles_rank_image(self): if self.doubles_rank is None: rank = 0 else: rank = self.doubles_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def standard_rank_image(self): if self.standard_rank is None: rank = 0 else: rank = self.standard_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" def solo_std_rank_image(self): if self.solo_std_rank is None: rank = 0 else: rank = self.solo_std_rank return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png" class Dota(Base): __tablename__ = "dota" steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) steam = relationship("Steam", backref="dota", lazy="joined") rank_tier = Column(Integer) wins = Column(Integer) losses = Column(Integer) most_played_hero = Column(Integer) def __repr__(self): return f"" def get_rank_icon_url(self): # Rank icon is determined by the first digit of the rank tier return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_icon_{str(self.rank_tier)[0] if self.rank_tier is not None else '0'}.png" def get_rank_stars_url(self): # Rank stars are determined by the second digit of the rank tier if self.rank_tier is None or str(self.rank_tier)[1] == "0": return "" return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_star_{str(self.rank_tier)[1]}.png" def get_rank_name(self): # This should probably be an enum, but who cares if self.rank_tier is None or self.rank_tier < 10: return "Unranked" number = str(self.rank_tier)[0] if number == "1": return "Harald" elif number == "2": return "Guardian" elif number == "3": return "Crusader" elif number == "4": return "Archon" elif number == "5": return "Legend" elif number == "6": return "Ancient" elif number == "7": return "Divine" def get_rank_number(self): if self.rank_tier is None or self.rank_tier < 10: return "" return str(self.rank_tier)[1] @staticmethod def create(session: Session, steam_id: int) -> "Dota": d = session.query(Dota).get(steam_id) if d is not None: raise AlreadyExistingError(repr(d)) r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}") r.raise_for_status() data = r.json() if "profile" not in data: raise NotFoundError("The specified user has never played Dota or has a private match history") new_record = Dota(steam_id=str(steam_id)) new_record.update() return new_record def update(self, session=None) -> bool: r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}") r.raise_for_status() data = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/wl") r.raise_for_status() wl = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/heroes") r.raise_for_status() heroes = r.json() changed = self.rank_tier != data["rank_tier"] self.rank_tier = data["rank_tier"] self.wins = wl["win"] self.losses = wl["lose"] self.most_played_hero = heroes[0]["hero_id"] return changed class LeagueOfLegendsRanks(enum.Enum): IRON = 0 BRONZE = 1 SILVER = 2 GOLD = 3 PLATINUM = 4 DIAMOND = 5 MASTER = 6 GRANDMASTER = 7 CHALLENGER = 8 def __str__(self): return self.name.capitalize() class RomanNumerals(enum.Enum): I = 1 II = 2 III = 3 IV = 4 def __str__(self): return self.name class LeagueOfLegends(Base): __tablename__ = "leagueoflegends" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="lol", lazy="joined") icon_id = Column(Integer) summoner_id = Column(String, primary_key=True) account_id = Column(String) summoner_name = Column(String) level = Column(Integer) solo_division = Column(Enum(LeagueOfLegendsRanks)) solo_rank = Column(Enum(RomanNumerals)) flex_division = Column(Enum(LeagueOfLegendsRanks)) flex_rank = Column(Enum(RomanNumerals)) twtr_division = Column(Enum(LeagueOfLegendsRanks)) twtr_rank = Column(Enum(RomanNumerals)) highest_mastery_champ = Column(Integer) def __repr__(self): if not self.summoner_name: return f"" return f"" @staticmethod def create(royal_id, summoner_name) -> "LeagueOfLegends": r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() data = r.json() lol = LeagueOfLegends() lol.royal_id = royal_id lol.summoner_name = summoner_name lol.summoner_id = data["id"] lol.account_id = data["accountId"] lol.icon_id = data["profileIconId"] lol.level = data["summonerLevel"] lol.update() return lol def update(self, session=None): r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() data = r.json() r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v4/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() rank = r.json() r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v4/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") r.raise_for_status() mastery = r.json() solo_q = None flex_q = None twtr_q = None for league in rank: if league["queueType"] == "RANKED_SOLO_5x5": solo_q = league elif league["queueType"] == "RANKED_FLEX_SR": flex_q = league elif league["queueType"] == "RANKED_FLEX_TT": twtr_q = league self.summoner_id = data["id"] self.summoner_name = data["name"] self.account_id = data["accountId"] self.level = data["summonerLevel"] solo = Dirty((self.solo_division, self.solo_rank)) flex = Dirty((self.flex_division, self.flex_rank)) twtr = Dirty((self.twtr_division, self.twtr_rank)) solo.value = (None, None) if solo_q is None else (LeagueOfLegendsRanks[solo_q["tier"]], RomanNumerals[solo_q["rank"]]) flex.value = (None, None) if flex_q is None else (LeagueOfLegendsRanks[flex_q["tier"]], RomanNumerals[flex_q["rank"]]) twtr.value = (None, None) if twtr_q is None else (LeagueOfLegendsRanks[twtr_q["tier"]], RomanNumerals[twtr_q["rank"]]) self.highest_mastery_champ = mastery[0]["championId"] self.solo_division = solo.value[0] self.solo_rank = solo.value[1] self.flex_division = flex.value[0] self.flex_rank = flex.value[1] self.twtr_division = twtr.value[0] self.twtr_rank = twtr.value[1] return solo, flex, twtr def highest_mastery_champ_name(self): champ = loldata.get_champ_by_key(self.highest_mastery_champ) return champ["name"] def highest_mastery_champ_image(self): champ = loldata.get_champ_by_key(self.highest_mastery_champ) return loldata.get_champ_icon(champ["name"]) class Osu(Base): __tablename__ = "osu" royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False) royal = relationship("Royal", backref="osu", lazy="joined") osu_id = Column(Integer, primary_key=True) osu_name = Column(String) std_pp = Column(Float) taiko_pp = Column(Float) catch_pp = Column(Float) mania_pp = Column(Float) @staticmethod def create(session: Session, royal_id, osu_name): o = session.query(Osu).filter(Osu.osu_name == osu_name).first() if o is not None: raise AlreadyExistingError(repr(o)) r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0") r0.raise_for_status() r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1") r1.raise_for_status() r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2") r2.raise_for_status() r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3") r3.raise_for_status() j0 = r0.json()[0] j1 = r1.json()[0] j2 = r2.json()[0] j3 = r3.json()[0] new_record = Osu(royal_id=royal_id, osu_id=j0["user_id"], osu_name=j0["username"], std_pp=j0["pp_raw"], taiko_pp=j1["pp_raw"], catch_pp=j2["pp_raw"], mania_pp=j3["pp_raw"]) return new_record def update(self, session=None): r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=0") r0.raise_for_status() r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=1") r1.raise_for_status() r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=2") r2.raise_for_status() r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=3") r3.raise_for_status() j0 = r0.json()[0] j1 = r1.json()[0] j2 = r2.json()[0] j3 = r3.json()[0] self.osu_name = j0["username"] self.std_pp = j0["pp_raw"] self.taiko_pp = j1["pp_raw"] self.catch_pp = j2["pp_raw"] self.mania_pp = j3["pp_raw"] def __repr__(self): if not self.osu_name: return f"" return f"" class Discord(Base): __tablename__ = "discord" __table_args__ = tuple(UniqueConstraint("name", "discriminator")) royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="discord", lazy="joined") discord_id = Column(BigInteger, primary_key=True) name = Column(String) discriminator = Column(Integer) avatar_hex = Column(String) def __str__(self): return f"{self.name}#{self.discriminator}" def __repr__(self): return f"" @staticmethod def create(session: Session, royal_username, discord_user: DiscordUser): # TODO: remove this print("Discord.create is deprecated and should be removed soon.") d = session.query(Discord).filter(Discord.discord_id == discord_user.id).first() if d is not None: raise AlreadyExistingError(repr(d)) r = session.query(Royal).filter(Royal.username == royal_username).first() if r is None: raise NotFoundError("No Royal exists with that username") d = session.query(Discord).filter(Discord.royal_id == r.id).first() if d is not None: raise AlreadyExistingError(repr(d)) d = Discord(royal=r, discord_id=discord_user.id, name=discord_user.name, discriminator=discord_user.discriminator, avatar_hex=discord_user.avatar) return d def mention(self): return f"<@{self.discord_id}>" def avatar_url(self, size=256): if self.avatar_hex is None: return "https://discordapp.com/assets/6debd47ed13483642cf09e832ed0bc1b.png" return f"https://cdn.discordapp.com/avatars/{self.discord_id}/{self.avatar_hex}.png?size={size}" class Overwatch(Base): __tablename__ = "overwatch" royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False) royal = relationship("Royal", backref="overwatch", lazy="joined") battletag = Column(String, primary_key=True) discriminator = Column(Integer, primary_key=True) icon = Column(String) level = Column(Integer) rank = Column(Integer) def __str__(self, separator="#"): return f"{self.battletag}{separator}{self.discriminator}" def __repr__(self): return f"" @staticmethod def create(session: Session, royal_id, battletag, discriminator=None): if discriminator is None: battletag, discriminator = battletag.split("#", 1) o = session.query(Overwatch).filter_by(battletag=battletag, discriminator=discriminator).first() if o is not None: raise AlreadyExistingError(repr(o)) o = Overwatch(royal_id=royal_id, battletag=battletag, discriminator=discriminator) o.update() return o def icon_url(self): return f"https://d1u1mce87gyfbn.cloudfront.net/game/unlocks/{self.icon}.png" def update(self, session=None): r = requests.get(f"https://owapi.net/api/v3/u/{self.battletag}-{self.discriminator}/stats", headers={ "User-Agent": "Royal-Bot/4.1", "From": "ste.pigozzi@gmail.com" }) r.raise_for_status() try: j = r.json()["eu"]["stats"].get("competitive") if j is None: logger.debug(f"No stats for {repr(self)}, skipping...") return if not j["game_stats"]: logger.debug(f"No stats for {repr(self)}, skipping...") return j = j["overall_stats"] except TypeError: logger.debug(f"No stats for {repr(self)}, skipping...") return try: self.icon = re.search(r"https://.+\.cloudfront\.net/game/unlocks/(0x[0-9A-F]+)\.png", j["avatar"]).group(1) except AttributeError: logger.debug(f"No icon available for {repr(self)}.") self.level = j["prestige"] * 100 + j["level"] self.rank = j["comprank"] def rank_url(self): if self.rank < 1500: n = 1 elif self.rank < 2000: n = 2 elif self.rank < 2500: n = 3 elif self.rank < 3000: n = 4 elif self.rank < 3500: n = 5 elif self.rank < 4000: n = 6 else: n = 7 return f"https://d1u1mce87gyfbn.cloudfront.net/game/rank-icons/season-2/rank-{n}.png" class Diario(Base): __tablename__ = "diario" id = Column(Integer, primary_key=True) timestamp = Column(DateTime, nullable=False) saver_id = Column(Integer, ForeignKey("telegram.telegram_id")) saver = relationship("Telegram", foreign_keys=saver_id, backref="diario_saves", lazy="joined") author_id = Column(Integer, ForeignKey("telegram.telegram_id")) author = relationship("Telegram", foreign_keys=author_id, backref="diario_authored", lazy="joined") spoiler = Column(Boolean, default=False) text = Column(String) def __repr__(self): return f"" def __str__(self): return f"{self.id} - {self.timestamp} - {self.author}: {self.text}" @staticmethod def import_from_json(file): import json session = Session() file = open(file, "r") j = json.load(file) author_ids = { "@Steffo": 25167391, "@GoodBalu": 19611986, "@gattopandacorno": 200821462, "@Albertino04": 131057096, "@Francesco_Cuoghi": 48371848, "@VenomousDoc": 48371848, "@MaxSensei": 1258401, "@Protoh": 125711787, "@McspKap": 304117728, "@FrankRekt": 31436195, "@EvilBalu": 26842090, "@Dailir": 135816455, "@Paltri": 186843362, "@Doom_darth_vader": 165792255, "@httpIma": 292086686, "@DavidoMessori": 509208316, "@DavidoNiichan": 509208316, "@Peraemela99": 63804599, "@infopz": 20403805, "@Baithoven": 121537369, "@Tauei": 102833717 } for n, entry in enumerate(j): author = author_ids[entry["sender"]] if "sender" in entry and entry["sender"] in author_ids else None d = Diario(timestamp=datetime.datetime.fromtimestamp(float(entry["timestamp"])), author_id=author, text=entry["text"]) print(f"{n} - {d}") session.add(d) session.commit() session.close() class BaluRage(Base): __tablename__ = "balurage" id = Column(Integer, primary_key=True) royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="times_raged", lazy="joined") reason = Column(String) def __repr__(self): return f"" class PlayedMusic(Base): __tablename__ = "playedmusic" id = Column(Integer, primary_key=True) enqueuer_id = Column(BigInteger, ForeignKey("discord.discord_id")) enqueuer = relationship("Discord", backref="music_played", lazy="joined") filename = Column(String) timestamp = Column(DateTime, nullable=False) def __repr__(self): return f"" class VoteQuestion(Base): __tablename__ = "votequestion" id = Column(Integer, primary_key=True) message_id = Column(BigInteger) question = Column(String, nullable=False) anonymous = Column(Boolean, nullable=False) open = Column(Boolean, default=True) def __repr__(self): return f"" def generate_text(self, session: Session): text = f"{self.question}\n\n" none, yes, no, abstain = 0, 0, 0, 0 if self.message_id is not None: query = session.execute("SELECT * FROM telegram LEFT JOIN (SELECT voteanswer.question_id, voteanswer.user_id, voteanswer.choice FROM votequestion JOIN voteanswer ON votequestion.id = voteanswer.question_id WHERE votequestion.message_id = " + str(self.message_id) + ") answer ON telegram.telegram_id = answer.user_id ORDER BY answer.choice;") for record in query: if record["username"] == "royalgamesbot": continue elif record["question_id"] is None: text += "⚪️" none += 1 elif record["choice"] == "YES": text += "🔵" yes += 1 elif record["choice"] == "NO": text += "🔴" no += 1 elif record["choice"] == "ABSTAIN": text += "⚫️" abstain += 1 if not self.anonymous: text += f" {str(record['username'])}\n" if self.anonymous: text += "\n" text += f"\n" \ f"⚪ {none}\n" \ f"🔵 {yes}\n" \ f"🔴 {no}\n" \ f"⚫️ {abstain}" return text class VoteChoices(enum.Enum): ABSTAIN = 1 YES = 2 NO = 3 class VoteAnswer(Base): __tablename__ = "voteanswer" question_id = Column(Integer, ForeignKey("votequestion.id")) question = relationship("VoteQuestion", backref="answers", lazy="joined") user_id = Column(BigInteger, ForeignKey("telegram.telegram_id")) user = relationship("Telegram", backref="votes_cast", lazy="joined") choice = Column(Enum(VoteChoices), nullable=False) __table_args__ = (PrimaryKeyConstraint("question_id", "user_id"),) def __repr__(self): return f"" class ProfileData(Base): __tablename__ = "profiledata" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="profile_data", uselist=False, lazy="joined") css = Column(Text) bio = Column(Text) def __repr__(self): return f"" class WikiEntry(Base): __tablename__ = "wikientries" key = Column(String, primary_key=True) content = Column(Text, nullable=False) def __repr__(self): return f"" class WikiLog(Base): __tablename__ = "wikilog" edit_id = Column(Integer, primary_key=True) editor_id = Column(Integer, ForeignKey("royals.id"), nullable=False) editor = relationship("Royal", backref="wiki_edits", lazy="joined") edited_key = Column(String, ForeignKey("wikientries.key"), nullable=False) edited = relationship("WikiEntry", backref="edit_logs", lazy="joined") timestamp = Column(DateTime, nullable=False) reason = Column(Text) def __repr__(self): return f"" class Event(Base): __tablename__ = "events" id = Column(Integer, primary_key=True) author_id = Column(Integer, ForeignKey("royals.id"), nullable=False) author = relationship("Royal", lazy="joined") name = Column(String, nullable=False) description = Column(Text) time = Column(DateTime, nullable=False) @hybrid_property def time_left(self) -> datetime.timedelta: return self.time - datetime.datetime.now() @time_left.setter def time_left(self, value): if not isinstance(value, datetime.timedelta): raise TypeError("time_left should be a datetime.timedelta") self.time = datetime.datetime.now() + value def __repr__(self): return f"" class Reddit(Base): __tablename__ = "reddit" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="reddit", lazy="joined") username = Column(String, primary_key=True) karma = Column(BigInteger) def __repr__(self): return f"" class GameLog(Base): __tablename__ = "gamelog" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="gamelog", lazy="joined") username = Column(String, primary_key=True) owned_games = Column(Integer) unfinished_games = Column(Integer) beaten_games = Column(Integer) completed_games = Column(Integer) mastered_games = Column(Integer) def __repr__(self): return f"" class ParsedRedditPost(Base): __tablename__ = "parsedredditposts" id = Column(String, primary_key=True) author_username = Column(String) def __repr__(self): return f"" class LoginToken(Base): __tablename__ = "logintoken" royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="tokens", lazy="joined") token = Column(String, primary_key=True) expiration = Column(DateTime, nullable=False) def __repr__(self): return f"" class Halloween(Base): """This is some nice spaghetti, don't you think?""" __tablename__ = "halloween" royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True) royal = relationship("Royal", backref="halloween", lazy="joined") first_trigger = Column(DateTime) puzzle_piece_a = Column(DateTime) puzzle_piece_b = Column(DateTime) puzzle_piece_c = Column(DateTime) puzzle_piece_d = Column(DateTime) puzzle_piece_e = Column(DateTime) puzzle_piece_f = Column(DateTime) puzzle_piece_g = Column(DateTime) boss_battle = Column(DateTime) def __getitem__(self, item): if not isinstance(item, int): raise TypeError("The index should be an int") if item == 1: return self.puzzle_piece_a elif item == 2: return self.puzzle_piece_b elif item == 3: return self.puzzle_piece_c elif item == 4: return self.puzzle_piece_d elif item == 5: return self.puzzle_piece_e elif item == 6: return self.puzzle_piece_f elif item == 7: return self.puzzle_piece_g else: raise ValueError("No such puzzle piece") def __setitem__(self, key, value): if not isinstance(key, int): raise TypeError("The index should be an int") if key == 1: self.puzzle_piece_a = value elif key == 2: self.puzzle_piece_b = value elif key == 3: self.puzzle_piece_c = value elif key == 4: self.puzzle_piece_d = value elif key == 5: self.puzzle_piece_e = value elif key == 6: self.puzzle_piece_f = value elif key == 7: self.puzzle_piece_g = value else: raise ValueError("No such puzzle piece") def pieces_completed(self) -> int: count = 0 for i in range(1, 8): if self[i]: count += 1 return count @staticmethod def puzzle_status() -> typing.Tuple[bool, typing.List[bool]]: session = Session() halloweens = session.query(Halloween).all() session.close() completed = [False for _ in range(7)] started = False for h in halloweens: if h.royal.role == "Affiliato": continue if h.royal.username == "Steffo": continue for i in range(7): if h.first_trigger is not None: started = True if h[i+1]: completed[i] = True return started, completed class ActivityReport(Base): __tablename__ = "activityreports" timestamp = Column(DateTime, primary_key=True) discord_members_online = Column(Integer) discord_members_ingame = Column(Integer) discord_cv = Column(Integer) discord_members_cv = Column(Integer) discord_channels_used = Column(Integer) def __repr__(self): return f"" # If run as script, create all the tables in the db if __name__ == "__main__": print("Creating new tables...") Base.metadata.create_all(bind=engine) print("Done!")