mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
439 lines
No EOL
17 KiB
Python
439 lines
No EOL
17 KiB
Python
import time
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, relationship
|
|
from sqlalchemy import Column, BigInteger, Integer, String, Numeric, DateTime, ForeignKey, Float, Enum, create_engine
|
|
import requests
|
|
from errors import RequestError, NotFoundError
|
|
import re
|
|
import enum
|
|
|
|
# Init the config reader
|
|
import configparser
|
|
config = configparser.ConfigParser()
|
|
config.read("config.ini")
|
|
|
|
# Init the sqlalchemy engine
|
|
engine = create_engine(config["Database"]["database_uri"])
|
|
Base = declarative_base(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
# Create a new default session
|
|
session = Session()
|
|
|
|
|
|
class Royal(Base):
|
|
__tablename__ = "royals"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
username = Column(String, unique=True, nullable=False)
|
|
|
|
def __repr__(self):
|
|
return f"<Royal {self.username}>"
|
|
|
|
|
|
class Telegram(Base):
|
|
__tablename__ = "telegram"
|
|
|
|
royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
|
|
royal = relationship("Royal")
|
|
|
|
telegram_id = Column(BigInteger, primary_key=True)
|
|
first_name = Column(String, nullable=False)
|
|
last_name = Column(String)
|
|
username = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<Telegram {self.id}>"
|
|
|
|
def __str__(self):
|
|
if self.username is not None:
|
|
return f"@{self.username}"
|
|
elif self.last_name is not None:
|
|
return f"{self.first_name} {self.last_name}"
|
|
else:
|
|
return self.first_name
|
|
|
|
|
|
class Steam(Base):
|
|
__tablename__ = "steam"
|
|
|
|
royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
|
|
royal = relationship("Royal")
|
|
|
|
steam_id = Column(String, primary_key=True)
|
|
persona_name = Column(String)
|
|
avatar_hex = Column(String)
|
|
trade_token = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<Steam {self.steam_id}>"
|
|
|
|
def __str__(self):
|
|
if self.persona_name is not None:
|
|
return self.persona_name
|
|
else:
|
|
return self.steam_id
|
|
|
|
@staticmethod
|
|
def get_or_create(royal_id, steam_id):
|
|
s = session.query(Steam).get(steam_id)
|
|
if s is not None:
|
|
return s
|
|
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={steam_id}")
|
|
if r.status_code != 200:
|
|
raise RequestError(f"Steam returned {r.status_code}")
|
|
j = r.json()
|
|
if len(j) == 0:
|
|
raise NotFoundError(f"The steam_id doesn't match any steam account")
|
|
s = Steam(royal_id=royal_id,
|
|
steam_id=steam_id,
|
|
persona_name=j["response"]["players"][0]["personaname"],
|
|
avatar_hex=re.search("https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1))
|
|
return s
|
|
|
|
@staticmethod
|
|
def find_trade_token(trade_url):
|
|
return re.search("https://steamcommunity\.com/tradeoffer/new/\?partner=[0-9]+&token=(.{8})", trade_url).group(1)
|
|
|
|
@staticmethod
|
|
def to_steam_id_2(steam_id):
|
|
# Got this code from a random github gist. It could be completely wrong.
|
|
z = (int(steam_id) - 76561197960265728) // 2
|
|
y = int(steam_id) % 2
|
|
return f"STEAM_0:{y}:{z}"
|
|
|
|
@staticmethod
|
|
def to_steam_id_3(steam_id, full=False):
|
|
# Got this code from a random github gist. It could be completely wrong.
|
|
if full:
|
|
return f"[U:1:{int(steam_id) - 76561197960265728}]"
|
|
else:
|
|
return f"{int(steam_id) - 76561197960265728}"
|
|
|
|
def update(self):
|
|
r = requests.get(f"https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={config['Steam']['api_key']}&steamids={self.steam_id}")
|
|
if r.status_code != 200:
|
|
raise RequestError(f"Steam returned {r.status_code}")
|
|
j = r.json()
|
|
self.persona_name = j["response"]["players"][0]["personaname"]
|
|
self.avatar_hex = re.search("https://steamcdn-a\.akamaihd\.net/steamcommunity/public/images/avatars/../(.+).jpg", j["response"]["players"][0]["avatar"]).group(1)
|
|
|
|
|
|
class RocketLeague(Base):
|
|
__tablename__ = "rocketleague"
|
|
|
|
steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True)
|
|
steam = relationship("Steam")
|
|
|
|
season = Column(Integer)
|
|
|
|
single_rank = Column(Integer)
|
|
single_div = Column(Integer)
|
|
single_mmr = Column(Integer)
|
|
|
|
doubles_rank = Column(Integer)
|
|
doubles_div = Column(Integer)
|
|
doubles_mmr = Column(Integer)
|
|
|
|
standard_rank = Column(Integer)
|
|
standard_div = Column(Integer)
|
|
standard_mmr = Column(Integer)
|
|
|
|
solo_std_rank = Column(Integer)
|
|
solo_std_div = Column(Integer)
|
|
solo_std_mmr = Column(Integer)
|
|
|
|
wins = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<RocketLeague {self.steam_id}>"
|
|
|
|
@staticmethod
|
|
def get_or_create(steam_id):
|
|
rl = session.query(RocketLeague).get(steam_id)
|
|
if rl is not None:
|
|
return rl
|
|
r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={str(steam_id)}&platform_id=1")
|
|
if r.status_code == 404:
|
|
raise NotFoundError("The specified user has never played Rocket League")
|
|
elif r.status_code != 200:
|
|
raise RequestError("Rocket League Stats returned {r.status_code}")
|
|
new_record = RocketLeague(steam_id=steam_id)
|
|
new_record.update(data=r.json())
|
|
return new_record
|
|
|
|
def update(self, data=None):
|
|
if data is None:
|
|
r = requests.get(f"https://api.rocketleaguestats.com/v1/player?apikey={config['Rocket League']['rlstats_api_key']}&unique_id={self.steam_id}&platform_id=1")
|
|
if r.status_code != 200:
|
|
raise RequestError(f"Rocket League Stats returned {r.status_code}")
|
|
data = r.json()
|
|
# Get current season
|
|
current_season = 0
|
|
for season in data["rankedSeasons"]:
|
|
if int(season) > current_season:
|
|
current_season = int(season)
|
|
if current_season == 0:
|
|
return
|
|
self.season = current_season
|
|
current_season = str(current_season)
|
|
# Get wins
|
|
self.wins = data["stats"]["wins"]
|
|
# Get ranked data
|
|
# Single 1v1
|
|
if "10" in data["rankedSeasons"][current_season]:
|
|
self.single_mmr = data["rankedSeasons"][current_season]["10"]["rankPoints"]
|
|
if data["rankedSeasons"][current_season]["10"]["matchesPlayed"] >= 10:
|
|
self.single_rank = data["rankedSeasons"][current_season]["10"]["tier"]
|
|
self.single_div = data["rankedSeasons"][current_season]["10"]["division"]
|
|
else:
|
|
self.single_rank = None
|
|
self.single_div = None
|
|
# Doubles 2v2
|
|
if "11" in data["rankedSeasons"][current_season]:
|
|
self.doubles_mmr = data["rankedSeasons"][current_season]["11"]["rankPoints"]
|
|
if data["rankedSeasons"][current_season]["11"]["matchesPlayed"] >= 10:
|
|
self.doubles_rank = data["rankedSeasons"][current_season]["11"]["tier"]
|
|
self.doubles_div = data["rankedSeasons"][current_season]["11"]["division"]
|
|
else:
|
|
self.doubles_rank = None
|
|
self.doubles_div = None
|
|
# Standard 3v3
|
|
if "13" in data["rankedSeasons"][current_season]:
|
|
self.standard_mmr = data["rankedSeasons"][current_season]["13"]["rankPoints"]
|
|
if data["rankedSeasons"][current_season]["13"]["matchesPlayed"] >= 10:
|
|
self.standard_rank = data["rankedSeasons"][current_season]["13"]["tier"]
|
|
self.standard_div = data["rankedSeasons"][current_season]["13"]["division"]
|
|
else:
|
|
self.standard_rank = None
|
|
self.standard_div = None
|
|
# Solo Standard 3v3
|
|
if "12" in data["rankedSeasons"][current_season]:
|
|
self.solo_std_mmr = data["rankedSeasons"][current_season]["12"]["rankPoints"]
|
|
if data["rankedSeasons"][current_season]["12"]["matchesPlayed"] >= 10:
|
|
self.solo_std_rank = data["rankedSeasons"][current_season]["12"]["tier"]
|
|
self.solo_std_div = data["rankedSeasons"][current_season]["12"]["division"]
|
|
else:
|
|
self.solo_std_rank = None
|
|
self.solo_std_div = None
|
|
|
|
|
|
class Dota(Base):
|
|
__tablename__ = "dota"
|
|
|
|
steam_id = Column(String, ForeignKey("steam.steam_id"), primary_key=True)
|
|
steam = relationship("Steam")
|
|
|
|
solo_mmr = Column(Integer)
|
|
party_mmr = Column(Integer)
|
|
|
|
wins = Column(Integer)
|
|
losses = Column(Integer)
|
|
|
|
@staticmethod
|
|
def get_or_create(steam_id):
|
|
d = session.query(Dota).get(steam_id)
|
|
if d is not None:
|
|
return d
|
|
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}")
|
|
if r.status_code != 200:
|
|
raise RequestError("OpenDota returned {r.status_code}")
|
|
data = r.json()
|
|
if "profile" not in data:
|
|
raise NotFoundError("The specified user has never played Dota or has a private match history")
|
|
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl")
|
|
if r.status_code != 200:
|
|
raise RequestError("OpenDota returned {r.status_code}")
|
|
wl = r.json()
|
|
new_record = Dota(steam_id=steam_id,
|
|
solo_mmr=data["solo_competitive_rank"],
|
|
party_mmr=data["competitive_rank"],
|
|
wins=wl["win"],
|
|
losses=wl["lose"])
|
|
return new_record
|
|
|
|
def update(self):
|
|
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(self.steam_id)}")
|
|
if r.status_code != 200:
|
|
raise RequestError("OpenDota returned {r.status_code}")
|
|
data = r.json()
|
|
r = requests.get(f"https://api.opendota.com/api/players/{Steam.to_steam_id_3(steam_id)}/wl")
|
|
if r.status_code != 200:
|
|
raise RequestError("OpenDota returned {r.status_code}")
|
|
wl = r.json()
|
|
self.solo_mmr = data["solo_competitive_rank"]
|
|
self.party_mmr = data["competitive_rank"]
|
|
self.wins = wl["win"]
|
|
self.losses = wl["lose"]
|
|
|
|
|
|
class LeagueOfLegendsRanks(enum.Enum):
|
|
BRONZE = 0
|
|
SILVER = 1
|
|
GOLD = 2
|
|
PLATINUM = 3
|
|
DIAMOND = 4
|
|
MASTER = 5
|
|
CHALLENGER = 6
|
|
|
|
|
|
class RomanNumerals(enum.Enum):
|
|
I = 1
|
|
II = 2
|
|
III = 3
|
|
IV = 4
|
|
V = 5
|
|
|
|
|
|
class LeagueOfLegends(Base):
|
|
__tablename__ = "leagueoflegends"
|
|
|
|
royal_id = Column(Integer, ForeignKey("royals.id"), nullable=False)
|
|
royal = relationship("Royal")
|
|
|
|
summoner_id = Column(BigInteger, primary_key=True)
|
|
summoner_name = Column(String)
|
|
|
|
level = Column(Integer, nullable=False)
|
|
solo_division = Column(Enum(LeagueOfLegendsRanks))
|
|
solo_rank = Column(Enum(RomanNumerals))
|
|
flex_division = Column(Enum(LeagueOfLegendsRanks))
|
|
flex_rank = Column(Enum(RomanNumerals))
|
|
twtr_division = Column(Enum(LeagueOfLegendsRanks))
|
|
twtr_rank = Column(Enum(RomanNumerals))
|
|
|
|
@staticmethod
|
|
def get_or_create(royal_id, summoner_name=None, summoner_id=None):
|
|
if summoner_name:
|
|
lol = session.query(LeagueOfLegends).filter(LeagueOfLegends.summoner_name == summoner_name).first()
|
|
elif summoner_id:
|
|
lol = session.query(LeagueOfLegends).get(summoner_id)
|
|
else:
|
|
raise SyntaxError("Neither summoner_name or summoner_id are specified")
|
|
if lol is not None:
|
|
return lol
|
|
# Get the summoner_id
|
|
if summoner_name:
|
|
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/{summoner_name}?api_key={config['League of Legends']['riot_api_key']}")
|
|
else:
|
|
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
|
|
if r.status_code != 200:
|
|
return RequestError(f"League of Legends API returned {r.status_code}")
|
|
data = r.json()
|
|
lol = LeagueOfLegends(royal_id=royal_id,
|
|
summoner_id=data["id"],
|
|
summoner_name=data["name"],
|
|
level=data["summonerLevel"])
|
|
lol.update()
|
|
return lol
|
|
|
|
def update(self):
|
|
r = requests.get(f"https://euw1.api.riotgames.com/lol/summoner/v3/summoners/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
|
|
if r.status_code != 200:
|
|
return RequestError(f"League of Legends API returned {r.status_code}")
|
|
data = r.json()
|
|
r = requests.get(f"https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/{self.summoner_id}?api_key={config['League of Legends']['riot_api_key']}")
|
|
if r.status_code != 200:
|
|
return RequestError(f"League of Legends API returned {r.status_code}")
|
|
rank = r.json()
|
|
solo_rank = None
|
|
flex_rank = None
|
|
twtr_rank = None
|
|
for league in rank:
|
|
if league["queueType"] == "RANKED_SOLO_5x5":
|
|
solo_rank = league
|
|
elif league["queueType"] == "RANKED_FLEX_SR":
|
|
flex_rank = league
|
|
elif league["queueType"] == "RANKED_FLEX_TT":
|
|
twtr_rank = league
|
|
self.summoner_id = data["id"]
|
|
self.summoner_name = data["name"]
|
|
self.level = data["summonerLevel"]
|
|
if solo_rank is not None:
|
|
self.solo_division = LeagueOfLegendsRanks[solo_rank["tier"]]
|
|
self.solo_rank = RomanNumerals[solo_rank["rank"]]
|
|
else:
|
|
self.solo_division = None
|
|
self.solo_rank = None
|
|
if flex_rank is not None:
|
|
self.flex_division = LeagueOfLegendsRanks[flex_rank["tier"]]
|
|
self.flex_rank = RomanNumerals[flex_rank["rank"]]
|
|
else:
|
|
self.flex_division = None
|
|
self.flex_rank = None
|
|
if twtr_rank is not None:
|
|
self.twtr_division = LeagueOfLegendsRanks[twtr_rank["tier"]]
|
|
self.twtr_rank = RomanNumerals[twtr_rank["rank"]]
|
|
else:
|
|
self.twtr_division = None
|
|
self.twtr_rank = None
|
|
|
|
|
|
class Osu(Base):
|
|
__tablename__ = "osu"
|
|
|
|
royal_id = Column(Integer, ForeignKey("royals.id"))
|
|
royal = relationship("Royal")
|
|
|
|
osu_id = Column(Integer, primary_key=True)
|
|
osu_name = Column(String)
|
|
|
|
std_pp = Column(Float)
|
|
taiko_pp = Column(Float)
|
|
catch_pp = Column(Float)
|
|
mania_pp = Column(Float)
|
|
|
|
@staticmethod
|
|
def get_or_create(royal_id, osu_name):
|
|
o = session.query(Osu).filter(Osu.royal_id == royal_id).first()
|
|
if o is not None:
|
|
return o
|
|
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0")
|
|
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1")
|
|
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2")
|
|
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3")
|
|
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
|
|
raise RequestError(f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
|
|
j0 = r0.json()[0]
|
|
j1 = r1.json()[0]
|
|
j2 = r2.json()[0]
|
|
j3 = r3.json()[0]
|
|
new_record = Osu(royal_id=royal_id,
|
|
osu_id=j0["user_id"],
|
|
osu_name=j0["username"],
|
|
std_pp=j0["pp_raw"],
|
|
taiko_pp=j1["pp_raw"],
|
|
catch_pp=j2["pp_raw"],
|
|
mania_pp=j3["pp_raw"])
|
|
return new_record
|
|
|
|
def update(self):
|
|
r0 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=0")
|
|
r1 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=1")
|
|
r2 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=2")
|
|
r3 = requests.get(f"https://osu.ppy.sh/api/get_user?k={config['Osu!']['ppy_api_key']}&u={osu_name}&m=3")
|
|
if r0.status_code != 200 or r1.status_code != 200 or r2.status_code != 200 or r3.status_code != 200:
|
|
raise RequestError(
|
|
f"Osu! API returned an error ({r0.status_code} {r1.status_code} {r2.status_code} {r3.status_code})")
|
|
j0 = r0.json()[0]
|
|
j1 = r1.json()[0]
|
|
j2 = r2.json()[0]
|
|
j3 = r3.json()[0]
|
|
self.osu_name = j0["username"]
|
|
self.std_pp = j0["pp_raw"]
|
|
self.taiko_pp = j1["pp_raw"]
|
|
self.catch_pp = j2["pp_raw"]
|
|
self.mania_pp = j3["pp_raw"]
|
|
|
|
|
|
# If run as script, create all the tables in the db
|
|
if __name__ == "__main__":
|
|
Base.metadata.create_all(bind=engine)
|
|
for player in session.query(Royal).all():
|
|
name = input(f"{player}: ")
|
|
if name == "":
|
|
continue
|
|
o = Osu.get_or_create(player.id, name)
|
|
print(o)
|
|
session.add(o)
|
|
session.commit() |