diff --git a/db.py b/db.py index acb8027d..6978f766 100644 --- a/db.py +++ b/db.py @@ -8,12 +8,13 @@ from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint, PrimaryKeyConstraint, Boolean, or_, LargeBinary, Text, Date, func, desc import requests -from errors import RequestError, NotFoundError, AlreadyExistingError +from errors import NotFoundError, AlreadyExistingError, PrivateError import re import enum from discord import User as DiscordUser from telegram import User as TelegramUser import loldata +from dirty import Dirty # Init the config reader import configparser @@ -133,8 +134,7 @@ class Steam(Base): if s is not None: raise AlreadyExistingError(repr(s)) r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}") - if r.status_code != 200: - raise RequestError(f"Steam returned {r.status_code}") + r.raise_for_status() j = r.json() if len(j) == 0: raise NotFoundError(f"The steam_id doesn't match any steam account") @@ -165,20 +165,16 @@ class Steam(Base): def update(self, session=None, raise_if_private: bool=False): r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}") - if r.status_code != 200: - raise RequestError(f"Steam returned {r.status_code}") + r.raise_for_status() j = r.json() self.persona_name = j["response"]["players"][0]["personaname"] self.avatar_hex = re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) r = requests.get(f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/?key={config['Steam']['api_key']}&steamid={self.steam_id}&format=json") - if r.status_code != 200: - raise RequestError(f"Steam returned {r.status_code}") + r.raise_for_status() j = r.json() - if "response" not in j \ - or "games" not in j["response"] \ - or len(j["response"]["games"]) < 1: + if "response" not in j or "games" not in j["response"] or len(j["response"]["games"]) < 1: if raise_if_private: - raise RequestError(f"Game data is private") + raise PrivateError(f"Game data is private") return self.most_played_game_id = j["response"]["games"][0]["appid"] @@ -301,8 +297,7 @@ class Dota(Base): if d is not None: raise AlreadyExistingError(repr(d)) r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}") - if r.status_code != 200: - raise RequestError("OpenDota returned {r.status_code}") + r.raise_for_status() data = r.json() if "profile" not in data: raise NotFoundError("The specified user has never played Dota or has a private match history") @@ -312,16 +307,13 @@ class Dota(Base): def update(self, session=None) -> bool: r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}") - if r.status_code != 200: - raise RequestError("OpenDota / returned {r.status_code}") + r.raise_for_status() data = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/wl") - if r.status_code != 200: - raise RequestError("OpenDota /wl returned {r.status_code}") + r.raise_for_status() wl = r.json() r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/heroes") - if r.status_code != 200: - raise RequestError("OpenDota /heroes returned {r.status_code}") + r.raise_for_status() heroes = r.json() changed = self.rank_tier != data["rank_tier"] self.rank_tier = data["rank_tier"] @@ -348,7 +340,6 @@ class RomanNumerals(enum.Enum): II = 2 III = 3 IV = 4 - V = 5 class LeagueOfLegends(Base): @@ -357,8 +348,9 @@ class LeagueOfLegends(Base): royal_id = Column(Integer, ForeignKey("royals.id")) royal = relationship("Royal", backref="lol", lazy="joined") - summoner_id = Column(BigInteger, primary_key=True) - account_id = Column(BigInteger) + icon_id = Column(Integer) + summoner_id = Column(String, primary_key=True) + account_id = Column(String) summoner_name = Column(String) level = Column(Integer) @@ -376,52 +368,53 @@ class LeagueOfLegends(Base): return f"" return f"" - def update(self, session=None) -> bool: - r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") - if r.status_code != 200: - raise RequestError(f"League of Legends API /summoner returned {r.status_code}") + @staticmethod + def create(royal_id, summoner_name) -> "LeagueOfLegends": + r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}") + r.raise_for_status() data = r.json() - r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") - if r.status_code != 200: - raise RequestError(f"League of Legends API /league returned {r.status_code}") + lol = LeagueOfLegends() + lol.royal_id = royal_id + lol.summoner_name = summoner_name + lol.summoner_id = data["id"] + lol.account_id = data["accountId"] + lol.icon_id = data["profileIconId"] + lol.level = data["summonerLevel"] + lol.update() + return lol + + def update(self, session=None): + r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + r.raise_for_status() + data = r.json() + r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v4/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + r.raise_for_status() rank = r.json() - r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v3/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") - if r.status_code != 200: - raise RequestError(f"League of Legends API /champion-mastery returned {r.status_code}") + r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v4/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}") + r.raise_for_status() mastery = r.json() - solo_rank = None - flex_rank = None - twtr_rank = None + solo_q = None + flex_q = None + twtr_q = None for league in rank: if league["queueType"] == "RANKED_SOLO_5x5": - solo_rank = league + solo_q = league elif league["queueType"] == "RANKED_FLEX_SR": - flex_rank = league + flex_q = league elif league["queueType"] == "RANKED_FLEX_TT": - twtr_rank = league + twtr_q = league self.summoner_id = data["id"] self.summoner_name = data["name"] self.account_id = data["accountId"] self.level = data["summonerLevel"] - if solo_rank is not None: - self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]] - self.solo_rank = RomanNumerals[solo_rank["rank"]] - else: - self.solo_division = None - self.solo_rank = None - if flex_rank is not None: - self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]] - self.flex_rank = RomanNumerals[flex_rank["rank"]] - else: - self.flex_division = None - self.flex_rank = None - if twtr_rank is not None: - self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]] - self.twtr_rank = RomanNumerals[twtr_rank["rank"]] - else: - self.twtr_division = None - self.twtr_rank = None + solo = Dirty((self.solo_division, self.solo_rank)) + flex = Dirty((self.flex_division, self.flex_rank)) + twtr = Dirty((self.twtr_division, self.twtr_rank)) + solo.value = (None, None) if solo_q is None else (LeagueOfLegendsRanks[solo_q["tier"]], RomanNumerals[solo_q["rank"]]) + flex.value = (None, None) if flex_q is None else (LeagueOfLegendsRanks[flex_q["tier"]], RomanNumerals[flex_q["rank"]]) + twtr.value = (None, None) if twtr_q is None else (LeagueOfLegendsRanks[twtr_q["tier"]], RomanNumerals[twtr_q["rank"]]) self.highest_mastery_champ = mastery[0]["championId"] + return solo, flex, twtr def highest_mastery_champ_name(self): champ = loldata.get_champ_by_key(self.highest_mastery_champ) @@ -452,11 +445,13 @@ class Osu(Base): if o is not None: raise AlreadyExistingError(repr(o)) r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0") + r0.raise_for_status() r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1") + r1.raise_for_status() r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2") + r2.raise_for_status() r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3") - if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200: - raise RequestError(f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})") + r3.raise_for_status() j0 = r0.json()[0] j1 = r1.json()[0] j2 = r2.json()[0] @@ -472,12 +467,13 @@ class Osu(Base): def update(self, session=None): r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=0") + r0.raise_for_status() r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=1") + r1.raise_for_status() r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=2") + r2.raise_for_status() r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=3") - if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200: - raise RequestError( - f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})") + r3.raise_for_status() j0 = r0.json()[0] j1 = r1.json()[0] j2 = r2.json()[0] @@ -533,7 +529,7 @@ class Discord(Base): return d def mention(self): - return f"<@{self.id}>" + return f"<@{self.discord_id}>" def avatar_url(self, size=256): if self.avatar_hex is None: @@ -581,8 +577,7 @@ class Overwatch(Base): "User-Agent": "Royal-Bot/4.1", "From": "ste.pigozzi@gmail.com" }) - if r.status_code != 200: - raise RequestError(f"OWAPI.net returned {r.status_code}") + r.raise_for_status() try: j = r.json()["eu"]["stats"].get("competitive") if j is None: diff --git a/dirty.py b/dirty.py new file mode 100644 index 00000000..9a95ede8 --- /dev/null +++ b/dirty.py @@ -0,0 +1,13 @@ +class Dirty: + def __init__(self, initial_value): + self.initial_value = initial_value + self.value = initial_value + + def is_clean(self): + return self.initial_value == self.value + + def is_dirty(self): + return not self.is_clean() + + def __bool__(self): + return self.is_dirty() diff --git a/errors.py b/errors.py index d3cc4bc0..ae5198de 100644 --- a/errors.py +++ b/errors.py @@ -1,7 +1,3 @@ -class RequestError(Exception): - pass - - class NotFoundError(Exception): pass @@ -48,3 +44,7 @@ class VideoInfoUnknown(Exception): class VideoIsNotReady(Exception): pass + + +class PrivateError(Exception): + pass diff --git a/newuser.py b/newuser.py index 66346e9c..334cc500 100644 --- a/newuser.py +++ b/newuser.py @@ -32,7 +32,7 @@ except Exception as e: else: session.add(overwatch) try: - lol = db.LeagueOfLegends.create(session, user.id, input("League summoner name: ")) + lol = db.LeagueOfLegends.create(user.id, input("League summoner name: ")) except Exception as e: print(e) else: diff --git a/statsupdate.py b/statsupdate.py index cd0956a9..bc24deb7 100644 --- a/statsupdate.py +++ b/statsupdate.py @@ -1,5 +1,3 @@ -import requests -import errors import db import time import logging @@ -10,7 +8,7 @@ import typing import telegram import sys import coloredlogs -import datetime +from dirty import Dirty logging.getLogger().disabled = True logger = logging.getLogger(__name__) @@ -44,12 +42,12 @@ def update_block(session: db.Session, block: list, delay: float=0, change_callba sentry.captureException() continue if change: - change_callback(item) + change_callback(item, change) sleep_time = delay - time.clock() + t time.sleep(sleep_time if sleep_time > 0 else 0) -def new_dota_rank(item: db.Dota): +def new_dota_rank(item: db.Dota, change): try: telegram_bot.send_message(config["Telegram"]["main_group"], f"✳️ {item.steam.royal.username} è salito a" @@ -58,14 +56,25 @@ def new_dota_rank(item: db.Dota): logger.warning(f"Couldn't notify on Telegram: {item}") -def new_lol_rank(item: db.LeagueOfLegends): +def new_lol_rank(item, change: typing.Tuple[Dirty]): + # It always gets called, even when there is no change + solo, flex, twtr = change try: - telegram_bot.send_message(config["Telegram"]["main_group"], - f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n" - f"\n" - f"Solo/Duo: {item.solo_division} {item.solo_rank}\n" - f"Flex: {item.flex_division} {item.flex_rank}\n" - f"3v3: {item.twtr_division} {item.twtr_rank}") + if solo: + telegram_bot.send_message(config["Telegram"]["main_group"], + f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n" + f"{solo.initial_value[0]} {solo.initial_value[1]} -> **{solo.value[0]} {solo.value[1]}**", + parse_mode="Markdown") + if flex: + telegram_bot.send_message(config["Telegram"]["main_group"], + f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n" + f"{flex.initial_value[0]} {flex.initial_value[1]} -> **{flex.value[0]} {flex.value[1]}**", + parse_mode="Markdown") + if twtr: + telegram_bot.send_message(config["Telegram"]["main_group"], + f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n" + f"{twtr.initial_value[0]} {twtr.initial_value[1]} -> **{twtr.value[0]} {twtr.value[1]}**", + parse_mode="Markdown") except Exception: logger.warning(f"Couldn't notify on Telegram: {item}")