diff --git a/db.py b/db.py index a01a2c33..c412c81e 100644 --- a/db.py +++ b/db.py @@ -1,9 +1,10 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy import Column, BigInteger, Integer, String, Numeric, DateTime, ForeignKey, Float, create_engine +from sqlalchemy import Column, BigInteger, Integer, String, Numeric, DateTime, ForeignKey, Float, Enum, create_engine import requests -from errors import RequestError +from errors import RequestError, NotFoundError import re +import enum # Init the config reader import configparser @@ -72,21 +73,41 @@ class Steam(Base): else: return self.steam_id - def steam_id_1(self): - return self.steam_id + @staticmethod + def get_or_create(royal_id, steam_id): + s = session.query(Steam).get(steam_id) + if s is not None: + return s + r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}") + if r.status_code != 200: + raise RequestError(f"Steam returned {r.status_code}") + j = r.json() + if len(j) == 0: + raise NotFoundError(f"The steam_id doesn't match any steam account") + s = Steam(royal_id=royal_id, + steam_id=steam_id, + persona_name=j["response"]["players"][0]["personaname"], + avatar_hex=re.search("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)) + return s - def steam_id_2(self): + @staticmethod + def find_trade_token(trade_url): + return re.search("https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url).group(1) + + @staticmethod + def to_steam_id_2(steam_id): # Got this code from a random github gist. It could be completely wrong. - z = (int(self.steam_id) - 76561197960265728) // 2 - y = int(self.steam_id) % 2 + z = (int(steam_id) - 76561197960265728) // 2 + y = int(steam_id) % 2 return f"STEAM_0:{y}:{z}" - def steam_id_3(self, full=False): + @staticmethod + def to_steam_id_3(steam_id, full=False): # Got this code from a random github gist. It could be completely wrong. if full: - return f"[U:1:{int(self.steam_id) - 76561197960265728}]" + return f"[U:1:{int(steam_id) - 76561197960265728}]" else: - return f"{int(self.steam_id) - 76561197960265728}" + return f"{int(steam_id) - 76561197960265728}" def update(self): r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}") @@ -94,7 +115,7 @@ class Steam(Base): raise RequestError(f"Steam returned {r.status_code}") j = r.json() self.persona_name = j["response"]["players"][0]["personaname"] - self.avatar_hex = re.search("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) + self.avatar_hex = re.search("https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) class RocketLeague(Base): @@ -127,27 +148,28 @@ class RocketLeague(Base): return f"" @staticmethod - def check_and_create(steam_id): - rl = session.query(RocketLeague).filter(RocketLeague.steam_id == steam_id).first() + def get_or_create(steam_id): + rl = session.query(RocketLeague).get(steam_id) if rl is not None: - return None + return rl r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={str(steam_id)}&platform_id=1") if r.status_code == 404: - return None - elif r.status_code == 500: + raise NotFoundError("The specified user has never played Rocket League") + elif r.status_code != 200: raise RequestError("Rocket League Stats returned {r.status_code}") new_record = RocketLeague(steam_id=steam_id) - new_record.update() + new_record.update(data=r.json()) return new_record - def update(self): - r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={self.steam_id}&platform_id=1") - if r.status_code != 200: - raise RequestError(f"Rocket League Stats returned {r.status_code}") - j = r.json() + def update(self, data=None): + if data is None: + r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={self.steam_id}&platform_id=1") + if r.status_code != 200: + raise RequestError(f"Rocket League Stats returned {r.status_code}") + data = r.json() # Get current season current_season = 0 - for season in j["rankedSeasons"]: + for season in data["rankedSeasons"]: if int(season) > current_season: current_season = int(season) self.season = current_season @@ -156,43 +178,194 @@ class RocketLeague(Base): current_season = str(current_season) # Get ranked data # Single 1v1 - if "10" in j["rankedSeasons"][current_season]: - self.single_mmr = j["rankedSeasons"][current_season]["10"]["rankPoints"] - if j["rankedSeasons"][current_season]["10"]["matchesPlayed"] >= 10: - self.single_rank = j["rankedSeasons"][current_season]["10"]["tier"] - self.single_div = j["rankedSeasons"][current_season]["10"]["division"] + if "10" in data["rankedSeasons"][current_season]: + self.single_mmr = data["rankedSeasons"][current_season]["10"]["rankPoints"] + if data["rankedSeasons"][current_season]["10"]["matchesPlayed"] >= 10: + self.single_rank = data["rankedSeasons"][current_season]["10"]["tier"] + self.single_div = data["rankedSeasons"][current_season]["10"]["division"] else: self.single_rank = None self.single_div = None # Doubles 2v2 - if "11" in j["rankedSeasons"][current_season]: - self.doubles_mmr = j["rankedSeasons"][current_season]["11"]["rankPoints"] - if j["rankedSeasons"][current_season]["11"]["matchesPlayed"] >= 10: - self.doubles_rank = j["rankedSeasons"][current_season]["11"]["tier"] - self.doubles_div = j["rankedSeasons"][current_season]["11"]["division"] + if "11" in data["rankedSeasons"][current_season]: + self.doubles_mmr = data["rankedSeasons"][current_season]["11"]["rankPoints"] + if data["rankedSeasons"][current_season]["11"]["matchesPlayed"] >= 10: + self.doubles_rank = data["rankedSeasons"][current_season]["11"]["tier"] + self.doubles_div = data["rankedSeasons"][current_season]["11"]["division"] else: self.doubles_rank = None self.doubles_div = None # Standard 3v3 - if "13" in j["rankedSeasons"][current_season]: - self.standard_mmr = j["rankedSeasons"][current_season]["13"]["rankPoints"] - if j["rankedSeasons"][current_season]["13"]["matchesPlayed"] >= 10: - self.standard_rank = j["rankedSeasons"][current_season]["13"]["tier"] - self.standard_div = j["rankedSeasons"][current_season]["13"]["division"] + if "13" in data["rankedSeasons"][current_season]: + self.standard_mmr = data["rankedSeasons"][current_season]["13"]["rankPoints"] + if data["rankedSeasons"][current_season]["13"]["matchesPlayed"] >= 10: + self.standard_rank = data["rankedSeasons"][current_season]["13"]["tier"] + self.standard_div = data["rankedSeasons"][current_season]["13"]["division"] else: self.standard_rank = None self.standard_div = None # Solo Standard 3v3 - if "12" in j["rankedSeasons"][current_season]: - self.solo_std_mmr = j["rankedSeasons"][current_season]["12"]["rankPoints"] - if j["rankedSeasons"][current_season]["12"]["matchesPlayed"] >= 10: - self.solo_std_rank = j["rankedSeasons"][current_season]["12"]["tier"] - self.solo_std_div = j["rankedSeasons"][current_season]["12"]["division"] + if "12" in data["rankedSeasons"][current_season]: + self.solo_std_mmr = data["rankedSeasons"][current_season]["12"]["rankPoints"] + if data["rankedSeasons"][current_season]["12"]["matchesPlayed"] >= 10: + self.solo_std_rank = data["rankedSeasons"][current_season]["12"]["tier"] + self.solo_std_div = data["rankedSeasons"][current_season]["12"]["division"] else: self.solo_std_rank = None self.solo_std_div = None +class Dota(Base): + __tablename__ = "dota" + + steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True) + steam = relationship("Steam") + + solo_mmr = Column(Integer) + party_mmr = Column(Integer) + + wins = Column(Integer) + losses = Column(Integer) + + @staticmethod + def get_or_create(steam_id): + d = session.query(Dota).get(steam_id) + if d is not None: + return d + r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}") + if r.status_code != 200: + raise RequestError("OpenDota returned {r.status_code}") + data = r.json() + if "profile" not in data: + raise NotFoundError("The specified user has never played Dota or has a private match history") + r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl") + if r.status_code != 200: + raise RequestError("OpenDota returned {r.status_code}") + wl = r.json() + new_record = Dota(steam_id=steam_id, + solo_mmr=data["solo_competitive_rank"], + party_mmr=data["competitive_rank"], + wins=wl["win"], + losses=wl["lose"]) + return new_record + + def update(self): + r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}") + if r.status_code != 200: + raise RequestError("OpenDota returned {r.status_code}") + data = r.json() + r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl") + if r.status_code != 200: + raise RequestError("OpenDota returned {r.status_code}") + wl = r.json() + self.solo_mmr = data["solo_competitive_rank"] + self.party_mmr = data["competitive_rank"] + self.wins = wl["win"] + self.losses = wl["lose"] + + +class LeagueOfLegendsRanks(enum.Enum): + BRONZE = 0 + SILVER = 1 + GOLD = 2 + PLATINUM = 3 + DIAMOND = 4 + MASTER = 5 + CHALLENGER = 6 + + +class RomanNumerals(enum.Enum): + I = 1 + II = 2 + III = 3 + IV = 4 + V = 5 + + +class LeagueOfLegends(Base): + __tablename__ = "leagueoflegends" + + royal_id = Column(Integer, ForeignKey("royals.id")) + royal = relationship("Royal") + + summoner_id = Column(BigInteger, primary_key=True) + summoner_name = Column(String) + + level = Column(Integer, nullable=False) + solo_division = Column(Enum(LeagueOfLegendsRanks)) + solo_rank = Column(Enum(RomanNumerals)) + flex_division = Column(Enum(LeagueOfLegendsRanks)) + flex_rank = Column(Enum(RomanNumerals)) + twtr_division = Column(Enum(LeagueOfLegendsRanks)) + twtr_rank = Column(Enum(RomanNumerals)) + + @staticmethod + def get_or_create(royal_id, summoner_name=None, summoner_id=None): + if summoner_name: + lol = session.query(LeagueOfLegends).filter(LeagueOfLegends.summoner_name == summoner_name).first() + elif summoner_id: + lol = session.query(LeagueOfLegends).get(summoner_id) + else: + raise SyntaxError("Neither summoner_name or summoner_id are specified") + if lol is not None: + return lol + # Get the summoner_id + if summoner_name: + r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}") + else: + r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + if r.status_code != 200: + return RequestError(f"League of Legends API returned {r.status_code}") + data = r.json() + lol = LeagueOfLegends(royal_id=royal_id, + summoner_id=data["id"], + summoner_name=data["name"], + level=data["summonerLevel"]) + lol.update() + return lol + + def update(self): + r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + if r.status_code != 200: + return RequestError(f"League of Legends API returned {r.status_code}") + data = r.json() + r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + if r.status_code != 200: + return RequestError(f"League of Legends API returned {r.status_code}") + rank = r.json() + solo_rank = None + flex_rank = None + twtr_rank = None + for league in rank: + if league["queueType"] == "RANKED_SOLO_5x5": + solo_rank = league + elif league["queueType"] == "RANKED_FLEX_SR": + flex_rank = league + elif league["queueType"] == "RANKED_FLEX_TT": + twtr_rank = league + self.summoner_id = data["id"] + self.summoner_name = data["name"] + self.level = data["summonerLevel"] + if solo_rank is not None: + self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]] + self.solo_rank = RomanNumerals[solo_rank["rank"]] + else: + self.solo_division = None + self.solo_rank = None + if flex_rank is not None: + self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]] + self.flex_rank = RomanNumerals[flex_rank["rank"]] + else: + self.flex_division = None + self.flex_rank = None + if twtr_rank is not None: + self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]] + self.twtr_rank = RomanNumerals[twtr_rank["rank"]] + else: + self.twtr_division = None + self.twtr_rank = None + + # If run as script, create all the tables in the db if __name__ == "__main__": Base.metadata.create_all(bind=engine) \ No newline at end of file diff --git a/errors.py b/errors.py index f267ff24..a4e187f3 100644 --- a/errors.py +++ b/errors.py @@ -1,2 +1,5 @@ class RequestError(Exception): + pass + +class NotFoundError(Exception): pass \ No newline at end of file