1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Updated bot

This commit is contained in:
Steffo 2018-12-18 17:34:34 +01:00
parent c58c309faa
commit 3328ed7296
5 changed files with 98 additions and 81 deletions

123
db.py
View file

@ -8,12 +8,13 @@ from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint, PrimaryKeyConstraint, Boolean, or_, LargeBinary, Text, Date, func, desc
import requests
from errors import RequestError, NotFoundError, AlreadyExistingError
from errors import NotFoundError, AlreadyExistingError, PrivateError
import re
import enum
from discord import User as DiscordUser
from telegram import User as TelegramUser
import loldata
from dirty import Dirty
# Init the config reader
import configparser
@ -133,8 +134,7 @@ class Steam(Base):
if s is not None:
raise AlreadyExistingError(repr(s))
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
r.raise_for_status()
j = r.json()
if len(j) == 0:
raise NotFoundError(f"The steam_id doesn't match any steam account")
@ -165,20 +165,16 @@ class Steam(Base):
def update(self, session=None, raise_if_private: bool=False):
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
r.raise_for_status()
j = r.json()
self.persona_name = j["response"]["players"][0]["personaname"]
self.avatar_hex = re.search(r"https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)
r = requests.get(f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/?key={config['Steam']['api_key']}&steamid={self.steam_id}&format=json")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
r.raise_for_status()
j = r.json()
if "response" not in j \
or "games" not in j["response"] \
or len(j["response"]["games"]) < 1:
if "response" not in j or "games" not in j["response"] or len(j["response"]["games"]) < 1:
if raise_if_private:
raise RequestError(f"Game data is private")
raise PrivateError(f"Game data is private")
return
self.most_played_game_id = j["response"]["games"][0]["appid"]
@ -301,8 +297,7 @@ class Dota(Base):
if d is not None:
raise AlreadyExistingError(repr(d))
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
r.raise_for_status()
data = r.json()
if "profile" not in data:
raise NotFoundError("The specified user has never played Dota or has a private match history")
@ -312,16 +307,13 @@ class Dota(Base):
def update(self, session=None) -> bool:
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}")
if r.status_code != 200:
raise RequestError("OpenDota / returned {r.status_code}")
r.raise_for_status()
data = r.json()
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/wl")
if r.status_code != 200:
raise RequestError("OpenDota /wl returned {r.status_code}")
r.raise_for_status()
wl = r.json()
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}/heroes")
if r.status_code != 200:
raise RequestError("OpenDota /heroes returned {r.status_code}")
r.raise_for_status()
heroes = r.json()
changed = self.rank_tier != data["rank_tier"]
self.rank_tier = data["rank_tier"]
@ -348,7 +340,6 @@ class RomanNumerals(enum.Enum):
II = 2
III = 3
IV = 4
V = 5
class LeagueOfLegends(Base):
@ -357,8 +348,9 @@ class LeagueOfLegends(Base):
royal_id = Column(Integer, ForeignKey("royals.id"))
royal = relationship("Royal", backref="lol", lazy="joined")
summoner_id = Column(BigInteger, primary_key=True)
account_id = Column(BigInteger)
icon_id = Column(Integer)
summoner_id = Column(String, primary_key=True)
account_id = Column(String)
summoner_name = Column(String)
level = Column(Integer)
@ -376,52 +368,53 @@ class LeagueOfLegends(Base):
return f"<LeagueOfLegends {self.summoner_id}>"
return f"<LeagueOfLegends {(''.join([x if x.isalnum else '' for x in self.summoner_name]))}>"
def update(self, session=None) -> bool:
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
raise RequestError(f"League of Legends API /summoner returned {r.status_code}")
@staticmethod
def create(royal_id, summoner_name) -> "LeagueOfLegends":
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}")
r.raise_for_status()
data = r.json()
r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
raise RequestError(f"League of Legends API /league returned {r.status_code}")
lol = LeagueOfLegends()
lol.royal_id = royal_id
lol.summoner_name = summoner_name
lol.summoner_id = data["id"]
lol.account_id = data["accountId"]
lol.icon_id = data["profileIconId"]
lol.level = data["summonerLevel"]
lol.update()
return lol
def update(self, session=None):
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v4/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
r.raise_for_status()
data = r.json()
r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v4/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
r.raise_for_status()
rank = r.json()
r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v3/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
raise RequestError(f"League of Legends API /champion-mastery returned {r.status_code}")
r = requests.get(f"https://euw1.api.riotgames.com/lol/champion-mastery/v4/champion-masteries/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
r.raise_for_status()
mastery = r.json()
solo_rank = None
flex_rank = None
twtr_rank = None
solo_q = None
flex_q = None
twtr_q = None
for league in rank:
if league["queueType"] == "RANKED_SOLO_5x5":
solo_rank = league
solo_q = league
elif league["queueType"] == "RANKED_FLEX_SR":
flex_rank = league
flex_q = league
elif league["queueType"] == "RANKED_FLEX_TT":
twtr_rank = league
twtr_q = league
self.summoner_id = data["id"]
self.summoner_name = data["name"]
self.account_id = data["accountId"]
self.level = data["summonerLevel"]
if solo_rank is not None:
self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]]
self.solo_rank = RomanNumerals[solo_rank["rank"]]
else:
self.solo_division = None
self.solo_rank = None
if flex_rank is not None:
self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]]
self.flex_rank = RomanNumerals[flex_rank["rank"]]
else:
self.flex_division = None
self.flex_rank = None
if twtr_rank is not None:
self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]]
self.twtr_rank = RomanNumerals[twtr_rank["rank"]]
else:
self.twtr_division = None
self.twtr_rank = None
solo = Dirty((self.solo_division, self.solo_rank))
flex = Dirty((self.flex_division, self.flex_rank))
twtr = Dirty((self.twtr_division, self.twtr_rank))
solo.value = (None, None) if solo_q is None else (LeagueOfLegendsRanks[solo_q["tier"]], RomanNumerals[solo_q["rank"]])
flex.value = (None, None) if flex_q is None else (LeagueOfLegendsRanks[flex_q["tier"]], RomanNumerals[flex_q["rank"]])
twtr.value = (None, None) if twtr_q is None else (LeagueOfLegendsRanks[twtr_q["tier"]], RomanNumerals[twtr_q["rank"]])
self.highest_mastery_champ = mastery[0]["championId"]
return solo, flex, twtr
def highest_mastery_champ_name(self):
champ = loldata.get_champ_by_key(self.highest_mastery_champ)
@ -452,11 +445,13 @@ class Osu(Base):
if o is not None:
raise AlreadyExistingError(repr(o))
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0")
r0.raise_for_status()
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1")
r1.raise_for_status()
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2")
r2.raise_for_status()
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3")
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
raise RequestError(f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
r3.raise_for_status()
j0 = r0.json()[0]
j1 = r1.json()[0]
j2 = r2.json()[0]
@ -472,12 +467,13 @@ class Osu(Base):
def update(self, session=None):
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=0")
r0.raise_for_status()
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=1")
r1.raise_for_status()
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=2")
r2.raise_for_status()
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={self.osu_name}&m=3")
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
raise RequestError(
f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
r3.raise_for_status()
j0 = r0.json()[0]
j1 = r1.json()[0]
j2 = r2.json()[0]
@ -533,7 +529,7 @@ class Discord(Base):
return d
def mention(self):
return f"<@{self.id}>"
return f"<@{self.discord_id}>"
def avatar_url(self, size=256):
if self.avatar_hex is None:
@ -581,8 +577,7 @@ class Overwatch(Base):
"User-Agent": "Royal-Bot/4.1",
"From": "ste.pigozzi@gmail.com"
})
if r.status_code != 200:
raise RequestError(f"OWAPI.net returned {r.status_code}")
r.raise_for_status()
try:
j = r.json()["eu"]["stats"].get("competitive")
if j is None:

13
dirty.py Normal file
View file

@ -0,0 +1,13 @@
class Dirty:
def __init__(self, initial_value):
self.initial_value = initial_value
self.value = initial_value
def is_clean(self):
return self.initial_value == self.value
def is_dirty(self):
return not self.is_clean()
def __bool__(self):
return self.is_dirty()

View file

@ -1,7 +1,3 @@
class RequestError(Exception):
pass
class NotFoundError(Exception):
pass
@ -48,3 +44,7 @@ class VideoInfoUnknown(Exception):
class VideoIsNotReady(Exception):
pass
class PrivateError(Exception):
pass

View file

@ -32,7 +32,7 @@ except Exception as e:
else:
session.add(overwatch)
try:
lol = db.LeagueOfLegends.create(session, user.id, input("League summoner name: "))
lol = db.LeagueOfLegends.create(user.id, input("League summoner name: "))
except Exception as e:
print(e)
else:

View file

@ -1,5 +1,3 @@
import requests
import errors
import db
import time
import logging
@ -10,7 +8,7 @@ import typing
import telegram
import sys
import coloredlogs
import datetime
from dirty import Dirty
logging.getLogger().disabled = True
logger = logging.getLogger(__name__)
@ -44,12 +42,12 @@ def update_block(session: db.Session, block: list, delay: float=0, change_callba
sentry.captureException()
continue
if change:
change_callback(item)
change_callback(item, change)
sleep_time = delay - time.clock() + t
time.sleep(sleep_time if sleep_time > 0 else 0)
def new_dota_rank(item: db.Dota):
def new_dota_rank(item: db.Dota, change):
try:
telegram_bot.send_message(config["Telegram"]["main_group"],
f"✳️ {item.steam.royal.username} è salito a"
@ -58,14 +56,25 @@ def new_dota_rank(item: db.Dota):
logger.warning(f"Couldn't notify on Telegram: {item}")
def new_lol_rank(item: db.LeagueOfLegends):
def new_lol_rank(item, change: typing.Tuple[Dirty]):
# It always gets called, even when there is no change
solo, flex, twtr = change
try:
if solo:
telegram_bot.send_message(config["Telegram"]["main_group"],
f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n"
f"\n"
f"Solo/Duo: {item.solo_division} {item.solo_rank}\n"
f"Flex: {item.flex_division} {item.flex_rank}\n"
f"3v3: {item.twtr_division} {item.twtr_rank}")
f"{solo.initial_value[0]} {solo.initial_value[1]} -> **{solo.value[0]} {solo.value[1]}**",
parse_mode="Markdown")
if flex:
telegram_bot.send_message(config["Telegram"]["main_group"],
f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n"
f"{flex.initial_value[0]} {flex.initial_value[1]} -> **{flex.value[0]} {flex.value[1]}**",
parse_mode="Markdown")
if twtr:
telegram_bot.send_message(config["Telegram"]["main_group"],
f"✳️ {item.royal.username} ha cambiato rank su League of Legends!\n"
f"{twtr.initial_value[0]} {twtr.initial_value[1]} -> **{twtr.value[0]} {twtr.value[1]}**",
parse_mode="Markdown")
except Exception:
logger.warning(f"Couldn't notify on Telegram: {item}")