1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-12-17 23:24:20 +00:00
royalnet/db.py

890 lines
32 KiB
Python
Raw Normal View History

2017-10-19 16:28:28 +00:00
import datetime
2017-10-04 16:51:40 +00:00
from sqlalchemy.ext.declarative import declarative_base
2018-07-26 17:26:03 +00:00
from sqlalchemy.orm import sessionmaker, relationship, joinedload
2018-07-23 16:45:43 +00:00
from sqlalchemy.ext.hybrid import hybrid_property
2018-07-26 17:26:03 +00:00
from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint, PrimaryKeyConstraint, Boolean, or_, LargeBinary, Text, Date, func, desc
2017-10-05 07:47:59 +00:00
import requests
2017-10-27 09:53:05 +00:00
from errors import RequestError, NotFoundError, AlreadyExistingError
2017-10-05 07:47:59 +00:00
import re
2017-10-08 18:26:51 +00:00
import enum
2017-10-27 11:38:32 +00:00
from discord import User as DiscordUser
2017-10-30 09:46:37 +00:00
from telegram import User as TelegramUser
2017-10-04 16:51:40 +00:00
# Init the config reader
import configparser
config = configparser.ConfigParser()
config.read("config.ini")
# Init the sqlalchemy engine
engine = create_engine(config["Database"]["database_uri"])
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
2018-02-26 09:34:44 +00:00
2017-10-04 16:51:40 +00:00
class Royal(Base):
__tablename__ = "royals"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, nullable=False)
2018-05-07 10:51:24 +00:00
password = Column(LargeBinary)
2018-06-08 18:13:20 +00:00
role = Column(String)
2018-05-07 10:51:24 +00:00
fiorygi = Column(Integer, default=0)
2018-06-08 18:54:58 +00:00
member_since = Column(Date)
2017-10-04 16:51:40 +00:00
2017-10-17 18:49:28 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, username: str):
2017-10-17 18:49:28 +00:00
r = session.query(Royal).filter_by(username=username).first()
if r is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(r))
2017-10-17 18:49:28 +00:00
return Royal(username=username)
2017-10-04 16:51:40 +00:00
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Royal {self.username}>"
2017-10-04 16:51:40 +00:00
class Telegram(Base):
__tablename__ = "telegram"
2018-08-28 13:14:29 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"))
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="telegram", lazy="joined")
2017-10-04 16:51:40 +00:00
telegram_id = Column(BigInteger, primary_key=True)
2018-08-28 13:14:29 +00:00
first_name = Column(String)
2017-10-04 16:51:40 +00:00
last_name = Column(String)
username = Column(String)
2017-10-30 09:46:37 +00:00
@staticmethod
def create(session: Session, royal_username, telegram_user: TelegramUser):
t = session.query(Telegram).filter_by(telegram_id=telegram_user.id).first()
if t is not None:
raise AlreadyExistingError(repr(t))
r = session.query(Royal).filter(Royal.username == royal_username).first()
if r is None:
raise NotFoundError("No Royal exists with that username")
t = session.query(Telegram).filter(Telegram.royal_id == r.id).first()
if t is not None:
raise AlreadyExistingError(repr(t))
return Telegram(royal=r,
telegram_id=telegram_user.id,
first_name=telegram_user.first_name,
last_name=telegram_user.last_name,
username=telegram_user.username)
2017-10-04 16:51:40 +00:00
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Telegram {self.telegram_id}>"
2017-10-04 16:51:40 +00:00
2018-03-14 10:53:26 +00:00
def mention(self):
2017-10-04 16:51:40 +00:00
if self.username is not None:
return f"@{self.username}"
2018-03-14 10:53:26 +00:00
else:
return self.first_name
def __str__(self):
if self.username is not None:
return self.username
2017-10-04 16:51:40 +00:00
elif self.last_name is not None:
return f"{self.first_name} {self.last_name}"
else:
return self.first_name
2017-10-04 21:29:02 +00:00
class Steam(Base):
__tablename__ = "steam"
2018-08-28 13:14:29 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"))
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="steam", lazy="joined")
2017-10-04 21:29:02 +00:00
steam_id = Column(String, primary_key=True)
2018-06-05 08:34:59 +00:00
persona_name = Column(String)
avatar_hex = Column(String)
2017-10-07 22:43:41 +00:00
trade_token = Column(String)
2018-06-05 08:34:59 +00:00
most_played_game_id = Column(BigInteger)
2017-10-04 21:29:02 +00:00
def __repr__(self):
2018-09-17 22:07:00 +00:00
if not self.persona_name:
return f"<db.Steam {self.steam_id}>"
return f"<db.Steam {self.persona_name}>"
2017-10-04 21:29:02 +00:00
def __str__(self):
2017-10-05 18:21:47 +00:00
if self.persona_name is not None:
return self.persona_name
2017-10-04 21:29:02 +00:00
else:
return self.steam_id
2018-06-05 08:34:59 +00:00
def most_played_game_url(self):
return f"https://steamcdn-a.akamaihd.net/steam/apps/{self.most_played_game_id}/header.jpg"
2017-10-25 09:09:06 +00:00
def avatar_url(self):
return f"https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/{self.avatar_hex[0:2]}/{self.avatar_hex}.jpg"
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, royal_id: int, steam_id: str):
2017-10-08 18:26:51 +00:00
s = session.query(Steam).get(steam_id)
if s is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(s))
2017-10-08 18:26:51 +00:00
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
j = r.json()
if len(j) == 0:
raise NotFoundError(f"The steam_id doesn't match any steam account")
s = Steam(royal_id=royal_id,
steam_id=steam_id,
persona_name=j["response"]["players"][0]["personaname"],
2017-10-17 18:49:28 +00:00
avatar_hex=re.search(r"https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1))
2017-10-08 18:26:51 +00:00
return s
2017-10-07 23:29:01 +00:00
2017-10-08 18:26:51 +00:00
@staticmethod
def find_trade_token(trade_url):
2017-10-17 18:49:28 +00:00
return re.search(r"https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url).group(1)
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_2(steam_id):
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
2017-10-08 18:26:51 +00:00
z = (int(steam_id) - 76561197960265728) // 2
y = int(steam_id) % 2
2017-10-07 23:29:01 +00:00
return f"STEAM_0:{y}:{z}"
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_3(steam_id, full=False):
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
if full:
2017-10-08 18:26:51 +00:00
return f"[U:1:{int(steam_id) - 76561197960265728}]"
2017-10-07 23:29:01 +00:00
else:
2017-10-08 18:26:51 +00:00
return f"{int(steam_id) - 76561197960265728}"
2017-10-07 23:29:01 +00:00
2017-10-05 07:47:59 +00:00
def update(self):
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
j = r.json()
self.persona_name = j["response"]["players"][0]["personaname"]
2017-10-17 18:49:28 +00:00
self.avatar_hex = re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)
2018-06-05 08:34:59 +00:00
r = requests.get(f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/?key={config['Steam']['api_key']}&steamid={self.steam_id}&format=json")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
j = r.json()
if "response" not in j \
or "games" not in j["response"] \
or len(j["response"]["games"]) < 1:
raise RequestError(f"Game data is private")
self.most_played_game_id = j["response"]["games"][0]["appid"]
2017-10-05 18:21:47 +00:00
2018-06-13 21:32:26 +00:00
2017-10-05 18:21:47 +00:00
class RocketLeague(Base):
__tablename__ = "rocketleague"
steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True)
2018-07-15 12:41:42 +00:00
steam = relationship("Steam", backref="rl", lazy="joined")
2017-10-05 18:21:47 +00:00
season = Column(Integer)
single_rank = Column(Integer)
single_div = Column(Integer)
single_mmr = Column(Integer)
doubles_rank = Column(Integer)
doubles_div = Column(Integer)
doubles_mmr = Column(Integer)
standard_rank = Column(Integer)
standard_div = Column(Integer)
standard_mmr = Column(Integer)
solo_std_rank = Column(Integer)
solo_std_div = Column(Integer)
solo_std_mmr = Column(Integer)
wins = Column(Integer)
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.RocketLeague {self.steam_id}>"
2017-10-05 18:21:47 +00:00
2018-08-28 13:14:29 +00:00
def update(self, data=None):
raise NotImplementedError("rlstats API is no longer available.")
2017-10-05 18:21:47 +00:00
2018-06-04 11:13:59 +00:00
def solo_rank_image(self):
if self.single_rank is None:
rank = 0
else:
rank = self.single_rank
return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png"
def doubles_rank_image(self):
if self.doubles_rank is None:
rank = 0
else:
rank = self.doubles_rank
return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png"
def standard_rank_image(self):
if self.standard_rank is None:
rank = 0
else:
rank = self.standard_rank
return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png"
def solo_std_rank_image(self):
if self.solo_std_rank is None:
rank = 0
else:
rank = self.solo_std_rank
return f"https://rocketleaguestats.com/assets/img/rocket_league/ranked/season_four/{rank}.png"
2017-10-05 18:21:47 +00:00
2017-10-08 18:26:51 +00:00
class Dota(Base):
__tablename__ = "dota"
steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True)
2018-07-15 12:41:42 +00:00
steam = relationship("Steam", backref="dota", lazy="joined")
2017-10-08 18:26:51 +00:00
2018-01-19 08:52:01 +00:00
rank_tier = Column(Integer)
2017-10-08 18:26:51 +00:00
2018-08-28 13:14:29 +00:00
wins = Column(Integer)
losses = Column(Integer)
most_played_hero = Column(Integer)
2017-10-08 18:26:51 +00:00
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<db.Dota {self.steam_id}>"
2018-01-19 08:52:01 +00:00
def get_rank_icon_url(self):
2018-01-19 14:51:54 +00:00
# Rank icon is determined by the first digit of the rank tier
return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_icon_{str(self.rank_tier)[0] if self.rank_tier is not None else '0'}.png"
def get_rank_stars_url(self):
# Rank stars are determined by the second digit of the rank tier
if self.rank_tier is None or str(self.rank_tier)[1] == "0":
return ""
return f"https://www.opendota.com/assets/images/dota2/rank_icons/rank_star_{str(self.rank_tier)[1]}.png"
def get_rank_name(self):
# This should probably be an enum, but who cares
if self.rank_tier is None or self.rank_tier < 10:
return "Unranked"
number = str(self.rank_tier)[0]
if number == "1":
return "Harald"
elif number == "2":
return "Guardian"
elif number == "3":
return "Crusader"
elif number == "4":
return "Archon"
elif number == "5":
return "Legend"
elif number == "6":
return "Ancient"
elif number == "7":
return "Divine"
def get_rank_number(self):
2018-01-19 08:52:01 +00:00
if self.rank_tier is None or self.rank_tier < 10:
2018-01-19 14:51:54 +00:00
return ""
return str(self.rank_tier)[1]
2018-01-19 08:52:01 +00:00
2017-10-08 18:26:51 +00:00
@staticmethod
2018-08-28 13:14:29 +00:00
def create(session: Session, steam_id: int) -> "Dota":
2017-10-08 18:26:51 +00:00
d = session.query(Dota).get(steam_id)
if d is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(d))
2017-10-08 18:26:51 +00:00
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
data = r.json()
if "profile" not in data:
raise NotFoundError("The specified user has never played Dota or has a private match history")
2018-08-28 13:14:29 +00:00
new_record = Dota(steam_id=str(steam_id))
new_record.update()
2017-10-08 18:26:51 +00:00
return new_record
2018-09-17 22:07:00 +00:00
def update(self) -> bool:
2017-10-08 18:26:51 +00:00
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}")
if r.status_code != 200:
2018-08-28 13:14:29 +00:00
raise RequestError("OpenDota / returned {r.status_code}")
2017-10-08 18:26:51 +00:00
data = r.json()
2017-10-10 21:30:06 +00:00
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/wl")
2017-10-08 18:26:51 +00:00
if r.status_code != 200:
2018-08-28 13:14:29 +00:00
raise RequestError("OpenDota /wl returned {r.status_code}")
2017-10-08 18:26:51 +00:00
wl = r.json()
2018-08-28 13:14:29 +00:00
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/heroes")
if r.status_code != 200:
raise RequestError("OpenDota /heroes returned {r.status_code}")
heroes = r.json()
2018-09-17 22:07:00 +00:00
changed = self.rank_tier != data["rank_tier"]
2018-01-19 08:52:01 +00:00
self.rank_tier = data["rank_tier"]
2017-10-08 18:26:51 +00:00
self.wins = wl["win"]
self.losses = wl["lose"]
2018-08-28 15:38:17 +00:00
self.most_played_hero = heroes[0]["hero_id"]
2018-09-17 22:07:00 +00:00
return changed
2017-10-08 18:26:51 +00:00
class LeagueOfLegendsRanks(enum.Enum):
BRONZE = 0
SILVER = 1
GOLD = 2
PLATINUM = 3
DIAMOND = 4
MASTER = 5
CHALLENGER = 6
class RomanNumerals(enum.Enum):
I = 1
II = 2
III = 3
IV = 4
V = 5
class LeagueOfLegends(Base):
__tablename__ = "leagueoflegends"
2018-08-28 13:14:29 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"))
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="lol", lazy="joined")
2017-10-08 18:26:51 +00:00
summoner_id = Column(BigInteger, primary_key=True)
2018-08-28 13:14:29 +00:00
summoner_name = Column(String)
2017-10-08 18:26:51 +00:00
2018-08-28 13:14:29 +00:00
level = Column(Integer)
2017-10-08 18:26:51 +00:00
solo_division = Column(Enum(LeagueOfLegendsRanks))
solo_rank = Column(Enum(RomanNumerals))
flex_division = Column(Enum(LeagueOfLegendsRanks))
flex_rank = Column(Enum(RomanNumerals))
twtr_division = Column(Enum(LeagueOfLegendsRanks))
twtr_rank = Column(Enum(RomanNumerals))
2018-08-28 13:14:29 +00:00
highest_mastery_champ = Column(Integer)
2018-09-17 22:07:00 +00:00
def __repr__(self):
if not self.summoner_name:
return f"<LeagueOfLegends {self.summoner_id}>"
return f"<LeagueOfLegends {(''.join([x if x.isalnum else '' for x in self.summoner_name]))}>"
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, royal_id, summoner_name=None, summoner_id=None):
2017-10-08 18:26:51 +00:00
if summoner_name:
lol = session.query(LeagueOfLegends).filter(LeagueOfLegends.summoner_name == summoner_name).first()
elif summoner_id:
lol = session.query(LeagueOfLegends).get(summoner_id)
else:
2018-08-28 13:14:29 +00:00
raise SyntaxError("Neither summoner_name or summoner_id were specified.")
2017-10-08 18:26:51 +00:00
if lol is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(lol))
2017-10-08 18:26:51 +00:00
# Get the summoner_id
if summoner_name:
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}")
else:
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
return RequestError(f"League of Legends API returned {r.status_code}")
data = r.json()
lol = LeagueOfLegends(royal_id=royal_id,
summoner_id=data["id"],
summoner_name=data["name"],
level=data["summonerLevel"])
lol.update()
return lol
2018-09-17 22:07:00 +00:00
def update(self) -> bool:
2017-10-08 18:26:51 +00:00
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
2018-09-17 22:07:00 +00:00
raise RequestError(f"League of Legends API /summoner returned {r.status_code}")
2017-10-08 18:26:51 +00:00
data = r.json()
r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
2018-09-17 22:07:00 +00:00
raise RequestError(f"League of Legends API /league returned {r.status_code}")
2017-10-08 18:26:51 +00:00
rank = r.json()
2018-08-28 13:14:29 +00:00
r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v3/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
2018-09-17 22:07:00 +00:00
raise RequestError(f"League of Legends API /champion-mastery returned {r.status_code}")
2018-08-28 13:14:29 +00:00
mastery = r.json()
2017-10-08 18:26:51 +00:00
solo_rank = None
flex_rank = None
twtr_rank = None
for league in rank:
if league["queueType"] == "RANKED_SOLO_5x5":
solo_rank = league
elif league["queueType"] == "RANKED_FLEX_SR":
flex_rank = league
elif league["queueType"] == "RANKED_FLEX_TT":
twtr_rank = league
self.summoner_id = data["id"]
self.summoner_name = data["name"]
self.level = data["summonerLevel"]
if solo_rank is not None:
self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]]
self.solo_rank = RomanNumerals[solo_rank["rank"]]
else:
self.solo_division = None
self.solo_rank = None
if flex_rank is not None:
self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]]
self.flex_rank = RomanNumerals[flex_rank["rank"]]
else:
self.flex_division = None
self.flex_rank = None
if twtr_rank is not None:
self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]]
self.twtr_rank = RomanNumerals[twtr_rank["rank"]]
else:
self.twtr_division = None
self.twtr_rank = None
2018-08-28 13:14:29 +00:00
self.highest_mastery_champ = mastery[0]["championId"]
2017-10-08 18:26:51 +00:00
2017-10-09 11:45:42 +00:00
class Osu(Base):
__tablename__ = "osu"
2017-10-17 18:49:28 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="osu", lazy="joined")
2017-10-09 11:45:42 +00:00
osu_id = Column(Integer, primary_key=True)
2018-08-28 13:14:29 +00:00
osu_name = Column(String)
2017-10-09 11:45:42 +00:00
std_pp = Column(Float)
taiko_pp = Column(Float)
catch_pp = Column(Float)
mania_pp = Column(Float)
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, royal_id, osu_name):
2017-10-17 18:49:28 +00:00
o = session.query(Osu).filter(Osu.osu_name == osu_name).first()
2017-10-09 11:45:42 +00:00
if o is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(o))
2017-10-09 11:45:42 +00:00
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0")
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1")
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2")
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3")
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
raise RequestError(f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
j0 = r0.json()[0]
j1 = r1.json()[0]
j2 = r2.json()[0]
j3 = r3.json()[0]
new_record = Osu(royal_id=royal_id,
osu_id=j0["user_id"],
osu_name=j0["username"],
std_pp=j0["pp_raw"],
taiko_pp=j1["pp_raw"],
catch_pp=j2["pp_raw"],
mania_pp=j3["pp_raw"])
return new_record
def update(self):
2017-10-10 21:30:06 +00:00
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=0")
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=1")
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=2")
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=3")
2017-10-09 11:45:42 +00:00
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
raise RequestError(
f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
j0 = r0.json()[0]
j1 = r1.json()[0]
j2 = r2.json()[0]
j3 = r3.json()[0]
self.osu_name = j0["username"]
self.std_pp = j0["pp_raw"]
self.taiko_pp = j1["pp_raw"]
self.catch_pp = j2["pp_raw"]
self.mania_pp = j3["pp_raw"]
2018-09-17 22:07:00 +00:00
def __repr__(self):
if not self.osu_name:
return f"<db.Osu {self.osu_id}>"
return f"<db.Osu {self.osu_name}>"
2017-10-09 11:45:42 +00:00
2017-10-17 18:49:28 +00:00
class Discord(Base):
__tablename__ = "discord"
__table_args__ = tuple(UniqueConstraint("name", "discriminator"))
2018-08-28 13:14:29 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"))
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="discord", lazy="joined")
2017-10-17 18:49:28 +00:00
discord_id = Column(BigInteger, primary_key=True)
2018-08-28 13:14:29 +00:00
name = Column(String)
discriminator = Column(Integer)
2017-10-17 18:49:28 +00:00
avatar_hex = Column(String)
def __str__(self):
2018-03-12 12:29:12 +00:00
return f"{self.name}#{self.discriminator}"
2017-10-17 18:49:28 +00:00
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Discord {self.discord_id}>"
2017-10-27 11:38:32 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, royal_username, discord_user: DiscordUser):
2018-08-28 13:14:29 +00:00
# TODO: remove this
print("Discord.create is deprecated and should be removed soon.")
2017-10-27 11:38:32 +00:00
d = session.query(Discord).filter(Discord.discord_id == discord_user.id).first()
if d is not None:
raise AlreadyExistingError(repr(d))
r = session.query(Royal).filter(Royal.username == royal_username).first()
if r is None:
raise NotFoundError("No Royal exists with that username")
d = session.query(Discord).filter(Discord.royal_id == r.id).first()
if d is not None:
raise AlreadyExistingError(repr(d))
d = Discord(royal=r,
discord_id=discord_user.id,
name=discord_user.name,
discriminator=discord_user.discriminator,
avatar_hex=discord_user.avatar)
return d
2017-10-17 18:49:28 +00:00
def mention(self):
return f"<@{self.id}>"
def avatar_url(self, size=256):
if self.avatar_hex is None:
return "https://discordapp.com/assets/6debd47ed13483642cf09e832ed0bc1b.png"
2018-03-12 12:29:12 +00:00
return f"https://cdn.discordapp.com/avatars/{self.discord_id}/{self.avatar_hex}.png?size={size}"
2017-10-17 18:49:28 +00:00
class Overwatch(Base):
__tablename__ = "overwatch"
royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="overwatch", lazy="joined")
2017-10-17 18:49:28 +00:00
battletag = Column(String, primary_key=True)
discriminator = Column(Integer, primary_key=True)
2018-08-28 13:14:29 +00:00
icon = Column(String)
2017-10-17 18:49:28 +00:00
2018-08-28 13:14:29 +00:00
level = Column(Integer)
2017-10-17 18:49:28 +00:00
rank = Column(Integer)
def __str__(self, separator="#"):
return f"{self.battletag}{separator}{self.discriminator}"
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Overwatch {self}>"
2017-10-17 18:49:28 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create(session: Session, royal_id, battletag, discriminator=None):
2017-10-17 18:49:28 +00:00
if discriminator is None:
battletag, discriminator = battletag.split("#", 1)
o = session.query(Overwatch).filter_by(battletag=battletag, discriminator=discriminator).first()
if o is not None:
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError(repr(o))
2017-10-17 18:49:28 +00:00
o = Overwatch(royal_id=royal_id,
battletag=battletag,
2018-08-28 13:14:29 +00:00
discriminator=discriminator)
o.update()
2017-10-17 18:49:28 +00:00
return o
2017-10-25 09:09:06 +00:00
def icon_url(self):
return f"https://d1u1mce87gyfbn.cloudfront.net/game/unlocks/{self.icon}.png"
2017-10-17 18:49:28 +00:00
def update(self):
r = requests.get(f"https://owapi.net/api/v3/u/{self.battletag}-{self.discriminator}/stats", headers={
2018-08-28 13:14:29 +00:00
"User-Agent": "Royal-Bot/4.1",
2017-10-17 18:49:28 +00:00
"From": "ste.pigozzi@gmail.com"
})
if r.status_code != 200:
raise RequestError(f"OWAPI.net returned {r.status_code}")
try:
2018-08-28 15:45:07 +00:00
j = r.json()["eu"]["stats"].get("competitive")
if j is None:
raise RequestError("Something went wrong when retrieving the stats.")
2018-08-28 15:53:58 +00:00
if not j["game_stats"]:
raise RequestError("Something went wrong when retrieving the stats.")
2018-08-28 15:45:07 +00:00
j = j["overall_stats"]
2017-10-17 18:49:28 +00:00
except TypeError:
raise RequestError("Something went wrong when retrieving the stats.")
2018-06-13 21:32:26 +00:00
try:
2018-08-28 15:53:58 +00:00
self.icon = re.search(r"https://.+\.cloudfront\.net/game/unlocks/(0x[0-9A-F]+)\.png", j["avatar"]).group(1)
2018-06-13 21:32:26 +00:00
except AttributeError:
pass
2017-10-17 18:49:28 +00:00
self.level = j["prestige"] * 100 + j["level"]
self.rank = j["comprank"]
def rank_url(self):
if self.rank < 1500:
n = 1
elif self.rank < 2000:
n = 2
elif self.rank < 2500:
n = 3
elif self.rank < 3000:
n = 4
elif self.rank < 3500:
n = 5
elif self.rank < 4000:
n = 6
else:
n = 7
return f"https://d1u1mce87gyfbn.cloudfront.net/game/rank-icons/season-2/rank-{n}.png"
2017-10-19 16:28:28 +00:00
class Diario(Base):
__tablename__ = "diario"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, nullable=False)
saver_id = Column(Integer, ForeignKey("telegram.telegram_id"))
2018-07-15 12:41:42 +00:00
saver = relationship("Telegram", foreign_keys=saver_id, backref="diario_saves", lazy="joined")
2017-10-19 16:28:28 +00:00
author_id = Column(Integer, ForeignKey("telegram.telegram_id"))
2018-07-15 12:41:42 +00:00
author = relationship("Telegram", foreign_keys=author_id, backref="diario_authored", lazy="joined")
2018-07-23 20:21:52 +00:00
spoiler = Column(Boolean, default=False)
2017-10-19 16:28:28 +00:00
text = Column(String)
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Diario {self.id}>"
2017-10-19 16:28:28 +00:00
def __str__(self):
return f"{self.id} - {self.timestamp} - {self.author}: {self.text}"
@staticmethod
def import_from_json(file):
import json
2017-10-30 09:46:37 +00:00
session = Session()
2017-10-19 16:28:28 +00:00
file = open(file, "r")
j = json.load(file)
2018-01-25 14:24:17 +00:00
author_ids = {
"@Steffo": 25167391,
"@GoodBalu": 19611986,
"@gattopandacorno": 200821462,
"@Albertino04": 131057096,
"@Francesco_Cuoghi": 48371848,
"@VenomousDoc": 48371848,
"@MaxSensei": 1258401,
"@Protoh": 125711787,
"@McspKap": 304117728,
"@FrankRekt": 31436195,
"@EvilBalu": 26842090,
"@Dailir": 135816455,
"@Paltri": 186843362,
"@Doom_darth_vader": 165792255,
"@httpIma": 292086686,
"@DavidoMessori": 509208316,
"@DavidoNiichan": 509208316,
"@Peraemela99": 63804599,
"@infopz": 20403805,
"@Baithoven": 121537369,
"@Tauei": 102833717
}
for n, entry in enumerate(j):
author = author_ids[entry["sender"]] if "sender" in entry and entry["sender"] in author_ids else None
2017-10-19 16:28:28 +00:00
d = Diario(timestamp=datetime.datetime.fromtimestamp(float(entry["timestamp"])),
2018-01-25 14:24:17 +00:00
author_id=author,
2017-10-19 16:28:28 +00:00
text=entry["text"])
2018-01-25 14:24:17 +00:00
print(f"{n} - {d}")
2017-10-19 16:28:28 +00:00
session.add(d)
session.commit()
2017-10-30 09:46:37 +00:00
session.close()
2017-10-19 16:28:28 +00:00
2018-01-25 14:24:17 +00:00
class BaluRage(Base):
__tablename__ = "balurage"
id = Column(Integer, primary_key=True)
royal_id = Column(Integer, ForeignKey("royals.id"))
2018-07-15 12:41:42 +00:00
royal = relationship("Royal", backref="times_raged", lazy="joined")
2018-01-25 14:24:17 +00:00
reason = Column(String)
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.BaluRage {self.id}>"
2018-01-25 14:24:17 +00:00
2018-03-11 19:03:21 +00:00
class PlayedMusic(Base):
__tablename__ = "playedmusic"
id = Column(Integer, primary_key=True)
2018-03-12 12:29:12 +00:00
enqueuer_id = Column(BigInteger, ForeignKey("discord.discord_id"))
2018-07-15 12:41:42 +00:00
enqueuer = relationship("Discord", backref="music_played", lazy="joined")
2018-03-11 19:03:21 +00:00
filename = Column(String)
2018-07-26 17:26:03 +00:00
timestamp = Column(DateTime, nullable=False)
2018-03-11 19:03:21 +00:00
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.PlayedMusic {self.filename}>"
2018-03-11 19:03:21 +00:00
2018-03-14 10:53:26 +00:00
class VoteQuestion(Base):
__tablename__ = "votequestion"
id = Column(Integer, primary_key=True)
message_id = Column(BigInteger)
question = Column(String, nullable=False)
anonymous = Column(Boolean, nullable=False)
open = Column(Boolean, default=True)
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.Vote {self.id}>"
2018-03-14 10:53:26 +00:00
def generate_text(self, session: Session):
text = f"<b>{self.question}</b>\n\n"
none, yes, no, abstain = 0, 0, 0, 0
2018-03-14 15:28:49 +00:00
if self.message_id is not None:
query = session.execute("SELECT * FROM telegram LEFT JOIN (SELECT voteanswer.question_id, voteanswer.user_id, voteanswer.choice FROM votequestion JOIN voteanswer ON votequestion.id = voteanswer.question_id WHERE votequestion.message_id = " + str(self.message_id) + ") answer ON telegram.telegram_id = answer.user_id ORDER BY answer.choice;")
for record in query:
if record["username"] == "royalgamesbot":
continue
elif record["question_id"] is None:
text += "⚪️"
none += 1
elif record["choice"] == "YES":
text += "🔵"
yes += 1
elif record["choice"] == "NO":
text += "🔴"
no += 1
elif record["choice"] == "ABSTAIN":
text += "⚫️"
abstain += 1
if not self.anonymous:
text += f" {str(record['username'])}\n"
if self.anonymous:
text += "\n"
text += f"\n" \
f"{none}\n" \
f"🔵 {yes}\n" \
f"🔴 {no}\n" \
f"⚫️ {abstain}"
2018-03-14 10:53:26 +00:00
return text
class VoteChoices(enum.Enum):
ABSTAIN = 1
YES = 2
NO = 3
class VoteAnswer(Base):
__tablename__ = "voteanswer"
question_id = Column(Integer, ForeignKey("votequestion.id"))
2018-07-15 12:41:42 +00:00
question = relationship("VoteQuestion", backref="answers", lazy="joined")
2018-03-14 10:53:26 +00:00
user_id = Column(BigInteger, ForeignKey("telegram.telegram_id"))
2018-07-15 12:41:42 +00:00
user = relationship("Telegram", backref="votes_cast", lazy="joined")
2018-03-14 10:53:26 +00:00
choice = Column(Enum(VoteChoices), nullable=False)
__table_args__ = (PrimaryKeyConstraint("question_id", "user_id"),)
def __repr__(self):
2018-09-17 22:07:00 +00:00
return f"<db.VoteAnswer {self.question_id} {self.user} {self.choice}>"
2018-03-14 10:53:26 +00:00
2018-08-01 16:05:18 +00:00
class ProfileData(Base):
__tablename__ = "profiledata"
2018-06-05 10:31:11 +00:00
royal_id = Column(Integer, ForeignKey("royals.id"), primary_key=True)
2018-08-01 16:05:18 +00:00
royal = relationship("Royal", backref="profile_data", uselist=False, lazy="joined")
2018-06-05 10:31:11 +00:00
2018-08-01 16:05:18 +00:00
css = Column(Text)
bio = Column(Text)
2018-06-05 10:31:11 +00:00
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<ProfileData for {self.royal.username}>"
2018-06-05 10:31:11 +00:00
2018-07-15 12:41:42 +00:00
class WikiEntry(Base):
__tablename__ = "wikientries"
key = Column(String, primary_key=True)
content = Column(Text, nullable=False)
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<WikiEntry {self.key}>"
2018-07-15 12:41:42 +00:00
class WikiLog(Base):
__tablename__ = "wikilog"
edit_id = Column(Integer, primary_key=True)
editor_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
editor = relationship("Royal", backref="wiki_edits", lazy="joined")
edited_key = Column(String, ForeignKey("wikientries.key"), nullable=False)
edited = relationship("WikiEntry", backref="edit_logs", lazy="joined")
timestamp = Column(DateTime, nullable=False)
reason = Column(Text)
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<WikiLog {self.edit_id}>"
2018-07-15 12:41:42 +00:00
2018-07-23 16:45:43 +00:00
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True)
author_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
2018-07-24 17:23:19 +00:00
author = relationship("Royal", lazy="joined")
2018-07-23 16:45:43 +00:00
name = Column(String, nullable=False)
description = Column(Text)
time = Column(DateTime, nullable=False)
@hybrid_property
def time_left(self) -> datetime.timedelta:
return self.time - datetime.datetime.now()
@time_left.setter
def time_left(self, value):
if not isinstance(value, datetime.timedelta):
raise TypeError("time_left should be a datetime.timedelta")
self.time = datetime.datetime.now() + value
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<Event {self.name}>"
2018-07-23 16:45:43 +00:00
2018-09-10 23:06:08 +00:00
class Reddit(Base):
__tablename__ = "reddit"
royal_id = Column(Integer, ForeignKey("royals.id"))
royal = relationship("Royal", backref="reddit", lazy="joined")
2018-09-11 00:02:09 +00:00
username = Column(String, primary_key=True)
2018-09-10 23:06:08 +00:00
karma = Column(BigInteger)
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<Reddit u/{self.username}>"
2018-09-10 23:06:08 +00:00
class GameLog(Base):
__tablename__ = "gamelog"
royal_id = Column(Integer, ForeignKey("royals.id"))
royal = relationship("Royal", backref="gamelog", lazy="joined")
2018-09-11 00:02:09 +00:00
username = Column(String, primary_key=True)
2018-09-10 23:06:08 +00:00
owned_games = Column(Integer)
unfinished_games = Column(Integer)
beaten_games = Column(Integer)
completed_games = Column(Integer)
mastered_games = Column(Integer)
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<GameLog {self.username}>"
2018-09-10 23:06:08 +00:00
2018-09-05 17:48:34 +00:00
class ParsedRedditPost(Base):
__tablename__ = "parsedredditposts"
id = Column(String, primary_key=True)
2018-09-10 23:06:08 +00:00
author_username = Column(String)
2018-09-05 17:48:34 +00:00
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<ParsedRedditPost {self.id}>"
2018-09-05 17:48:34 +00:00
2018-09-13 21:51:06 +00:00
class LoginToken(Base):
__tablename__ = "logintoken"
royal_id = Column(Integer, ForeignKey("royals.id"))
royal = relationship("Royal", backref="tokens", lazy="joined")
token = Column(String, primary_key=True)
expiration = Column(DateTime, nullable=False)
2018-09-17 22:07:00 +00:00
def __repr__(self):
return f"<LoginToken for {self.royal.username}>"
2018-09-13 21:51:06 +00:00
2017-10-04 16:51:40 +00:00
# If run as script, create all the tables in the db
if __name__ == "__main__":
print("Creating new tables...")
Base.metadata.create_all(bind=engine)
2018-07-15 12:41:42 +00:00
print("Done!")