1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

si inizia a ragionare

This commit is contained in:
Steffo 2017-10-08 20:26:51 +02:00
parent 9380226408
commit c4e29c6ab6
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS.
GPG key ID: C27544372FBB445D
2 changed files with 219 additions and 43 deletions

253
db.py
View file

@ -1,9 +1,10 @@
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, BigInteger, Integer, String, Numeric, DateTime, ForeignKey, Float, create_engine from sqlalchemy import Column, BigInteger, Integer, String, Numeric, DateTime, ForeignKey, Float, Enum, create_engine
import requests import requests
from errors import RequestError from errors import RequestError, NotFoundError
import re import re
import enum
# Init the config reader # Init the config reader
import configparser import configparser
@ -72,21 +73,41 @@ class Steam(Base):
else: else:
return self.steam_id return self.steam_id
def steam_id_1(self): @staticmethod
return self.steam_id def get_or_create(royal_id, steam_id):
s = session.query(Steam).get(steam_id)
if s is not None:
return s
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}")
if r.status_code != 200:
raise RequestError(f"Steam returned {r.status_code}")
j = r.json()
if len(j) == 0:
raise NotFoundError(f"The steam_id doesn't match any steam account")
s = Steam(royal_id=royal_id,
steam_id=steam_id,
persona_name=j["response"]["players"][0]["personaname"],
avatar_hex=re.search("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1))
return s
def steam_id_2(self): @staticmethod
def find_trade_token(trade_url):
return re.search("https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url).group(1)
@staticmethod
def to_steam_id_2(steam_id):
# Got this code from a random github gist. It could be completely wrong. # Got this code from a random github gist. It could be completely wrong.
z = (int(self.steam_id) - 76561197960265728) // 2 z = (int(steam_id) - 76561197960265728) // 2
y = int(self.steam_id) % 2 y = int(steam_id) % 2
return f"STEAM_0:{y}:{z}" return f"STEAM_0:{y}:{z}"
def steam_id_3(self, full=False): @staticmethod
def to_steam_id_3(steam_id, full=False):
# Got this code from a random github gist. It could be completely wrong. # Got this code from a random github gist. It could be completely wrong.
if full: if full:
return f"[U:1:{int(self.steam_id) - 76561197960265728}]" return f"[U:1:{int(steam_id) - 76561197960265728}]"
else: else:
return f"{int(self.steam_id) - 76561197960265728}" return f"{int(steam_id) - 76561197960265728}"
def update(self): def update(self):
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}") r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}")
@ -94,7 +115,7 @@ class Steam(Base):
raise RequestError(f"Steam returned {r.status_code}") raise RequestError(f"Steam returned {r.status_code}")
j = r.json() j = r.json()
self.persona_name = j["response"]["players"][0]["personaname"] self.persona_name = j["response"]["players"][0]["personaname"]
self.avatar_hex = re.search("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1) self.avatar_hex = re.search("https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)
class RocketLeague(Base): class RocketLeague(Base):
@ -127,27 +148,28 @@ class RocketLeague(Base):
return f"<RocketLeague {self.steam_id}>" return f"<RocketLeague {self.steam_id}>"
@staticmethod @staticmethod
def check_and_create(steam_id): def get_or_create(steam_id):
rl = session.query(RocketLeague).filter(RocketLeague.steam_id == steam_id).first() rl = session.query(RocketLeague).get(steam_id)
if rl is not None: if rl is not None:
return None return rl
r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={str(steam_id)}&platform_id=1") r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={str(steam_id)}&platform_id=1")
if r.status_code == 404: if r.status_code == 404:
return None raise NotFoundError("The specified user has never played Rocket League")
elif r.status_code == 500: elif r.status_code != 200:
raise RequestError("Rocket League Stats returned {r.status_code}") raise RequestError("Rocket League Stats returned {r.status_code}")
new_record = RocketLeague(steam_id=steam_id) new_record = RocketLeague(steam_id=steam_id)
new_record.update() new_record.update(data=r.json())
return new_record return new_record
def update(self): def update(self, data=None):
if data is None:
r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={self.steam_id}&platform_id=1") r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={self.steam_id}&platform_id=1")
if r.status_code != 200: if r.status_code != 200:
raise RequestError(f"Rocket League Stats returned {r.status_code}") raise RequestError(f"Rocket League Stats returned {r.status_code}")
j = r.json() data = r.json()
# Get current season # Get current season
current_season = 0 current_season = 0
for season in j["rankedSeasons"]: for season in data["rankedSeasons"]:
if int(season) > current_season: if int(season) > current_season:
current_season = int(season) current_season = int(season)
self.season = current_season self.season = current_season
@ -156,43 +178,194 @@ class RocketLeague(Base):
current_season = str(current_season) current_season = str(current_season)
# Get ranked data # Get ranked data
# Single 1v1 # Single 1v1
if "10" in j["rankedSeasons"][current_season]: if "10" in data["rankedSeasons"][current_season]:
self.single_mmr = j["rankedSeasons"][current_season]["10"]["rankPoints"] self.single_mmr = data["rankedSeasons"][current_season]["10"]["rankPoints"]
if j["rankedSeasons"][current_season]["10"]["matchesPlayed"] >= 10: if data["rankedSeasons"][current_season]["10"]["matchesPlayed"] >= 10:
self.single_rank = j["rankedSeasons"][current_season]["10"]["tier"] self.single_rank = data["rankedSeasons"][current_season]["10"]["tier"]
self.single_div = j["rankedSeasons"][current_season]["10"]["division"] self.single_div = data["rankedSeasons"][current_season]["10"]["division"]
else: else:
self.single_rank = None self.single_rank = None
self.single_div = None self.single_div = None
# Doubles 2v2 # Doubles 2v2
if "11" in j["rankedSeasons"][current_season]: if "11" in data["rankedSeasons"][current_season]:
self.doubles_mmr = j["rankedSeasons"][current_season]["11"]["rankPoints"] self.doubles_mmr = data["rankedSeasons"][current_season]["11"]["rankPoints"]
if j["rankedSeasons"][current_season]["11"]["matchesPlayed"] >= 10: if data["rankedSeasons"][current_season]["11"]["matchesPlayed"] >= 10:
self.doubles_rank = j["rankedSeasons"][current_season]["11"]["tier"] self.doubles_rank = data["rankedSeasons"][current_season]["11"]["tier"]
self.doubles_div = j["rankedSeasons"][current_season]["11"]["division"] self.doubles_div = data["rankedSeasons"][current_season]["11"]["division"]
else: else:
self.doubles_rank = None self.doubles_rank = None
self.doubles_div = None self.doubles_div = None
# Standard 3v3 # Standard 3v3
if "13" in j["rankedSeasons"][current_season]: if "13" in data["rankedSeasons"][current_season]:
self.standard_mmr = j["rankedSeasons"][current_season]["13"]["rankPoints"] self.standard_mmr = data["rankedSeasons"][current_season]["13"]["rankPoints"]
if j["rankedSeasons"][current_season]["13"]["matchesPlayed"] >= 10: if data["rankedSeasons"][current_season]["13"]["matchesPlayed"] >= 10:
self.standard_rank = j["rankedSeasons"][current_season]["13"]["tier"] self.standard_rank = data["rankedSeasons"][current_season]["13"]["tier"]
self.standard_div = j["rankedSeasons"][current_season]["13"]["division"] self.standard_div = data["rankedSeasons"][current_season]["13"]["division"]
else: else:
self.standard_rank = None self.standard_rank = None
self.standard_div = None self.standard_div = None
# Solo Standard 3v3 # Solo Standard 3v3
if "12" in j["rankedSeasons"][current_season]: if "12" in data["rankedSeasons"][current_season]:
self.solo_std_mmr = j["rankedSeasons"][current_season]["12"]["rankPoints"] self.solo_std_mmr = data["rankedSeasons"][current_season]["12"]["rankPoints"]
if j["rankedSeasons"][current_season]["12"]["matchesPlayed"] >= 10: if data["rankedSeasons"][current_season]["12"]["matchesPlayed"] >= 10:
self.solo_std_rank = j["rankedSeasons"][current_season]["12"]["tier"] self.solo_std_rank = data["rankedSeasons"][current_season]["12"]["tier"]
self.solo_std_div = j["rankedSeasons"][current_season]["12"]["division"] self.solo_std_div = data["rankedSeasons"][current_season]["12"]["division"]
else: else:
self.solo_std_rank = None self.solo_std_rank = None
self.solo_std_div = None self.solo_std_div = None
class Dota(Base):
__tablename__ = "dota"
steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True)
steam = relationship("Steam")
solo_mmr = Column(Integer)
party_mmr = Column(Integer)
wins = Column(Integer)
losses = Column(Integer)
@staticmethod
def get_or_create(steam_id):
d = session.query(Dota).get(steam_id)
if d is not None:
return d
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
data = r.json()
if "profile" not in data:
raise NotFoundError("The specified user has never played Dota or has a private match history")
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
wl = r.json()
new_record = Dota(steam_id=steam_id,
solo_mmr=data["solo_competitive_rank"],
party_mmr=data["competitive_rank"],
wins=wl["win"],
losses=wl["lose"])
return new_record
def update(self):
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
data = r.json()
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl")
if r.status_code != 200:
raise RequestError("OpenDota returned {r.status_code}")
wl = r.json()
self.solo_mmr = data["solo_competitive_rank"]
self.party_mmr = data["competitive_rank"]
self.wins = wl["win"]
self.losses = wl["lose"]
class LeagueOfLegendsRanks(enum.Enum):
BRONZE = 0
SILVER = 1
GOLD = 2
PLATINUM = 3
DIAMOND = 4
MASTER = 5
CHALLENGER = 6
class RomanNumerals(enum.Enum):
I = 1
II = 2
III = 3
IV = 4
V = 5
class LeagueOfLegends(Base):
__tablename__ = "leagueoflegends"
royal_id = Column(Integer, ForeignKey("royals.id"))
royal = relationship("Royal")
summoner_id = Column(BigInteger, primary_key=True)
summoner_name = Column(String)
level = Column(Integer, nullable=False)
solo_division = Column(Enum(LeagueOfLegendsRanks))
solo_rank = Column(Enum(RomanNumerals))
flex_division = Column(Enum(LeagueOfLegendsRanks))
flex_rank = Column(Enum(RomanNumerals))
twtr_division = Column(Enum(LeagueOfLegendsRanks))
twtr_rank = Column(Enum(RomanNumerals))
@staticmethod
def get_or_create(royal_id, summoner_name=None, summoner_id=None):
if summoner_name:
lol = session.query(LeagueOfLegends).filter(LeagueOfLegends.summoner_name == summoner_name).first()
elif summoner_id:
lol = session.query(LeagueOfLegends).get(summoner_id)
else:
raise SyntaxError("Neither summoner_name or summoner_id are specified")
if lol is not None:
return lol
# Get the summoner_id
if summoner_name:
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}")
else:
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
return RequestError(f"League of Legends API returned {r.status_code}")
data = r.json()
lol = LeagueOfLegends(royal_id=royal_id,
summoner_id=data["id"],
summoner_name=data["name"],
level=data["summonerLevel"])
lol.update()
return lol
def update(self):
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
return RequestError(f"League of Legends API returned {r.status_code}")
data = r.json()
r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
if r.status_code != 200:
return RequestError(f"League of Legends API returned {r.status_code}")
rank = r.json()
solo_rank = None
flex_rank = None
twtr_rank = None
for league in rank:
if league["queueType"] == "RANKED_SOLO_5x5":
solo_rank = league
elif league["queueType"] == "RANKED_FLEX_SR":
flex_rank = league
elif league["queueType"] == "RANKED_FLEX_TT":
twtr_rank = league
self.summoner_id = data["id"]
self.summoner_name = data["name"]
self.level = data["summonerLevel"]
if solo_rank is not None:
self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]]
self.solo_rank = RomanNumerals[solo_rank["rank"]]
else:
self.solo_division = None
self.solo_rank = None
if flex_rank is not None:
self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]]
self.flex_rank = RomanNumerals[flex_rank["rank"]]
else:
self.flex_division = None
self.flex_rank = None
if twtr_rank is not None:
self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]]
self.twtr_rank = RomanNumerals[twtr_rank["rank"]]
else:
self.twtr_division = None
self.twtr_rank = None
# If run as script, create all the tables in the db # If run as script, create all the tables in the db
if __name__ == "__main__": if __name__ == "__main__":
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)

View file

@ -1,2 +1,5 @@
class RequestError(Exception): class RequestError(Exception):
pass pass
class NotFoundError(Exception):
pass