2017-10-19 16:28:28 +00:00
import datetime
2017-10-04 16:51:40 +00:00
from sqlalchemy . ext . declarative import declarative_base
from sqlalchemy . orm import sessionmaker , relationship
2018-03-14 10:53:26 +00:00
from sqlalchemy import Column , BigInteger , Integer , String , DateTime , ForeignKey , Float , Enum , create_engine , UniqueConstraint , PrimaryKeyConstraint , Boolean , or_
2017-10-05 07:47:59 +00:00
import requests
2017-10-27 09:53:05 +00:00
from errors import RequestError , NotFoundError , AlreadyExistingError
2017-10-05 07:47:59 +00:00
import re
2017-10-08 18:26:51 +00:00
import enum
2017-10-27 11:38:32 +00:00
from discord import User as DiscordUser
2017-10-30 09:46:37 +00:00
from telegram import User as TelegramUser
2017-10-04 16:51:40 +00:00
# Init the config reader
import configparser
config = configparser . ConfigParser ( )
config . read ( " config.ini " )
# Init the sqlalchemy engine
engine = create_engine ( config [ " Database " ] [ " database_uri " ] )
Base = declarative_base ( bind = engine )
Session = sessionmaker ( bind = engine )
2018-02-26 09:34:44 +00:00
2017-10-04 16:51:40 +00:00
class Royal ( Base ) :
__tablename__ = " royals "
id = Column ( Integer , primary_key = True )
username = Column ( String , unique = True , nullable = False )
2017-10-17 18:49:28 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , username : str ) :
2017-10-17 18:49:28 +00:00
r = session . query ( Royal ) . filter_by ( username = username ) . first ( )
if r is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( r ) )
2017-10-17 18:49:28 +00:00
return Royal ( username = username )
2017-10-04 16:51:40 +00:00
def __repr__ ( self ) :
return f " <Royal { self . username } > "
class Telegram ( Base ) :
__tablename__ = " telegram "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-04 16:51:40 +00:00
telegram_id = Column ( BigInteger , primary_key = True )
first_name = Column ( String , nullable = False )
last_name = Column ( String )
username = Column ( String )
2017-10-30 09:46:37 +00:00
@staticmethod
def create ( session : Session , royal_username , telegram_user : TelegramUser ) :
t = session . query ( Telegram ) . filter_by ( telegram_id = telegram_user . id ) . first ( )
if t is not None :
raise AlreadyExistingError ( repr ( t ) )
r = session . query ( Royal ) . filter ( Royal . username == royal_username ) . first ( )
if r is None :
raise NotFoundError ( " No Royal exists with that username " )
t = session . query ( Telegram ) . filter ( Telegram . royal_id == r . id ) . first ( )
if t is not None :
raise AlreadyExistingError ( repr ( t ) )
return Telegram ( royal = r ,
telegram_id = telegram_user . id ,
first_name = telegram_user . first_name ,
last_name = telegram_user . last_name ,
username = telegram_user . username )
2017-10-04 16:51:40 +00:00
def __repr__ ( self ) :
2018-03-14 10:53:26 +00:00
return f " <Telegram { self . telegram_id } > "
2017-10-04 16:51:40 +00:00
2018-03-14 10:53:26 +00:00
def mention ( self ) :
2017-10-04 16:51:40 +00:00
if self . username is not None :
return f " @ { self . username } "
2018-03-14 10:53:26 +00:00
else :
return self . first_name
def __str__ ( self ) :
if self . username is not None :
return self . username
2017-10-04 16:51:40 +00:00
elif self . last_name is not None :
return f " { self . first_name } { self . last_name } "
else :
return self . first_name
2017-10-04 21:29:02 +00:00
class Steam ( Base ) :
__tablename__ = " steam "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-04 21:29:02 +00:00
steam_id = Column ( String , primary_key = True )
2017-10-17 18:49:28 +00:00
persona_name = Column ( String , nullable = False )
avatar_hex = Column ( String , nullable = False )
2017-10-07 22:43:41 +00:00
trade_token = Column ( String )
2017-10-04 21:29:02 +00:00
def __repr__ ( self ) :
return f " <Steam { self . steam_id } > "
def __str__ ( self ) :
2017-10-05 18:21:47 +00:00
if self . persona_name is not None :
return self . persona_name
2017-10-04 21:29:02 +00:00
else :
return self . steam_id
2017-10-25 09:09:06 +00:00
def avatar_url ( self ) :
return f " https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/ { self . avatar_hex [ 0 : 2 ] } / { self . avatar_hex } .jpg "
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id : int , steam_id : str ) :
2017-10-08 18:26:51 +00:00
s = session . query ( Steam ) . get ( steam_id )
if s is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( s ) )
2017-10-08 18:26:51 +00:00
r = requests . get ( f " https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key= { config [ ' Steam ' ] [ ' api_key ' ] } &steamids= { steam_id } " )
if r . status_code != 200 :
raise RequestError ( f " Steam returned { r . status_code } " )
j = r . json ( )
if len ( j ) == 0 :
raise NotFoundError ( f " The steam_id doesn ' t match any steam account " )
s = Steam ( royal_id = royal_id ,
steam_id = steam_id ,
persona_name = j [ " response " ] [ " players " ] [ 0 ] [ " personaname " ] ,
2017-10-17 18:49:28 +00:00
avatar_hex = re . search ( r " https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg " , j [ " response " ] [ " players " ] [ 0 ] [ " avatar " ] ) . group ( 1 ) )
2017-10-08 18:26:51 +00:00
return s
2017-10-07 23:29:01 +00:00
2017-10-08 18:26:51 +00:00
@staticmethod
def find_trade_token ( trade_url ) :
2017-10-17 18:49:28 +00:00
return re . search ( r " https://steamcommunity \ .com/tradeoffer/new/ \ ?partner=[0-9]+&token=(. {8} ) " , trade_url ) . group ( 1 )
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_2 ( steam_id ) :
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
2017-10-08 18:26:51 +00:00
z = ( int ( steam_id ) - 76561197960265728 ) / / 2
y = int ( steam_id ) % 2
2017-10-07 23:29:01 +00:00
return f " STEAM_0: { y } : { z } "
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_3 ( steam_id , full = False ) :
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
if full :
2017-10-08 18:26:51 +00:00
return f " [U:1: { int ( steam_id ) - 76561197960265728 } ] "
2017-10-07 23:29:01 +00:00
else :
2017-10-08 18:26:51 +00:00
return f " { int ( steam_id ) - 76561197960265728 } "
2017-10-07 23:29:01 +00:00
2017-10-05 07:47:59 +00:00
def update ( self ) :
r = requests . get ( f " https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key= { config [ ' Steam ' ] [ ' api_key ' ] } &steamids= { self . steam_id } " )
if r . status_code != 200 :
raise RequestError ( f " Steam returned { r . status_code } " )
j = r . json ( )
self . persona_name = j [ " response " ] [ " players " ] [ 0 ] [ " personaname " ]
2017-10-17 18:49:28 +00:00
self . avatar_hex = re . search ( r " https://steamcdn-a \ .akamaihd \ .net/steamcommunity/public/images/avatars/../(.+).jpg " , j [ " response " ] [ " players " ] [ 0 ] [ " avatar " ] ) . group ( 1 )
2017-10-04 21:29:02 +00:00
2017-10-05 18:21:47 +00:00
class RocketLeague ( Base ) :
__tablename__ = " rocketleague "
steam_id = Column ( String , ForeignKey ( " steam.steam_id " ) , primary_key = True )
2018-01-25 14:24:17 +00:00
steam = relationship ( " Steam " , lazy = " joined " )
2017-10-05 18:21:47 +00:00
season = Column ( Integer )
single_rank = Column ( Integer )
single_div = Column ( Integer )
single_mmr = Column ( Integer )
doubles_rank = Column ( Integer )
doubles_div = Column ( Integer )
doubles_mmr = Column ( Integer )
standard_rank = Column ( Integer )
standard_div = Column ( Integer )
standard_mmr = Column ( Integer )
solo_std_rank = Column ( Integer )
solo_std_div = Column ( Integer )
solo_std_mmr = Column ( Integer )
wins = Column ( Integer )
def __repr__ ( self ) :
return f " <RocketLeague { self . steam_id } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , steam_id : str ) :
2017-10-08 18:26:51 +00:00
rl = session . query ( RocketLeague ) . get ( steam_id )
2017-10-05 18:21:47 +00:00
if rl is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( rl ) )
2017-10-07 22:43:41 +00:00
r = requests . get ( f " https://api.rocketleaguestats.com/v1/player?apikey= { config [ ' Rocket League ' ] [ ' rlstats_api_key ' ] } &unique_id= { str ( steam_id ) } &platform_id=1 " )
if r . status_code == 404 :
2017-10-08 18:26:51 +00:00
raise NotFoundError ( " The specified user has never played Rocket League " )
elif r . status_code != 200 :
2017-10-07 22:43:41 +00:00
raise RequestError ( " Rocket League Stats returned {r.status_code} " )
2017-10-05 18:21:47 +00:00
new_record = RocketLeague ( steam_id = steam_id )
2017-10-08 18:26:51 +00:00
new_record . update ( data = r . json ( ) )
2017-10-05 18:21:47 +00:00
return new_record
2017-10-08 18:26:51 +00:00
def update ( self , data = None ) :
if data is None :
r = requests . get ( f " https://api.rocketleaguestats.com/v1/player?apikey= { config [ ' Rocket League ' ] [ ' rlstats_api_key ' ] } &unique_id= { self . steam_id } &platform_id=1 " )
if r . status_code != 200 :
raise RequestError ( f " Rocket League Stats returned { r . status_code } " )
data = r . json ( )
2017-10-05 18:21:47 +00:00
# Get current season
current_season = 0
2017-10-08 18:26:51 +00:00
for season in data [ " rankedSeasons " ] :
2017-10-05 18:21:47 +00:00
if int ( season ) > current_season :
current_season = int ( season )
if current_season == 0 :
return
2017-10-09 11:45:42 +00:00
self . season = current_season
2017-10-05 18:21:47 +00:00
current_season = str ( current_season )
2017-10-09 11:45:42 +00:00
# Get wins
self . wins = data [ " stats " ] [ " wins " ]
2017-10-05 18:21:47 +00:00
# Get ranked data
# Single 1v1
2017-10-08 18:26:51 +00:00
if " 10 " in data [ " rankedSeasons " ] [ current_season ] :
self . single_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " matchesPlayed " ] > = 10 :
self . single_rank = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " tier " ]
self . single_div = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . single_rank = None
self . single_div = None
2017-10-05 18:21:47 +00:00
# Doubles 2v2
2017-10-08 18:26:51 +00:00
if " 11 " in data [ " rankedSeasons " ] [ current_season ] :
self . doubles_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " matchesPlayed " ] > = 10 :
self . doubles_rank = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " tier " ]
self . doubles_div = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . doubles_rank = None
self . doubles_div = None
2017-10-05 18:21:47 +00:00
# Standard 3v3
2017-10-08 18:26:51 +00:00
if " 13 " in data [ " rankedSeasons " ] [ current_season ] :
self . standard_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " matchesPlayed " ] > = 10 :
self . standard_rank = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " tier " ]
self . standard_div = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . standard_rank = None
self . standard_div = None
2017-10-05 18:21:47 +00:00
# Solo Standard 3v3
2017-10-08 18:26:51 +00:00
if " 12 " in data [ " rankedSeasons " ] [ current_season ] :
self . solo_std_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " matchesPlayed " ] > = 10 :
self . solo_std_rank = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " tier " ]
self . solo_std_div = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . solo_std_rank = None
self . solo_std_div = None
2017-10-05 18:21:47 +00:00
2017-10-08 18:26:51 +00:00
class Dota ( Base ) :
__tablename__ = " dota "
steam_id = Column ( String , ForeignKey ( " steam.steam_id " ) , primary_key = True )
2018-01-25 14:24:17 +00:00
steam = relationship ( " Steam " , lazy = " joined " )
2017-10-08 18:26:51 +00:00
2018-01-19 08:52:01 +00:00
rank_tier = Column ( Integer )
2017-10-08 18:26:51 +00:00
2017-10-17 18:49:28 +00:00
wins = Column ( Integer , nullable = False )
losses = Column ( Integer , nullable = False )
2017-10-08 18:26:51 +00:00
2018-01-19 08:52:01 +00:00
def get_rank_icon_url ( self ) :
2018-01-19 14:51:54 +00:00
# Rank icon is determined by the first digit of the rank tier
return f " https://www.opendota.com/assets/images/dota2/rank_icons/rank_icon_ { str ( self . rank_tier ) [ 0 ] if self . rank_tier is not None else ' 0 ' } .png "
def get_rank_stars_url ( self ) :
# Rank stars are determined by the second digit of the rank tier
if self . rank_tier is None or str ( self . rank_tier ) [ 1 ] == " 0 " :
return " "
return f " https://www.opendota.com/assets/images/dota2/rank_icons/rank_star_ { str ( self . rank_tier ) [ 1 ] } .png "
def get_rank_name ( self ) :
# This should probably be an enum, but who cares
if self . rank_tier is None or self . rank_tier < 10 :
return " Unranked "
number = str ( self . rank_tier ) [ 0 ]
if number == " 1 " :
return " Harald "
elif number == " 2 " :
return " Guardian "
elif number == " 3 " :
return " Crusader "
elif number == " 4 " :
return " Archon "
elif number == " 5 " :
return " Legend "
elif number == " 6 " :
return " Ancient "
elif number == " 7 " :
return " Divine "
def get_rank_number ( self ) :
2018-01-19 08:52:01 +00:00
if self . rank_tier is None or self . rank_tier < 10 :
2018-01-19 14:51:54 +00:00
return " "
return str ( self . rank_tier ) [ 1 ]
2018-01-19 08:52:01 +00:00
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , steam_id : int ) :
2017-10-08 18:26:51 +00:00
d = session . query ( Dota ) . get ( steam_id )
if d is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( d ) )
2017-10-08 18:26:51 +00:00
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( steam_id ) } " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
data = r . json ( )
if " profile " not in data :
raise NotFoundError ( " The specified user has never played Dota or has a private match history " )
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( steam_id ) } /wl " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
wl = r . json ( )
new_record = Dota ( steam_id = steam_id ,
2018-01-19 08:52:01 +00:00
rank_tier = data [ " rank_tier " ] ,
2017-10-08 18:26:51 +00:00
wins = wl [ " win " ] ,
losses = wl [ " lose " ] )
return new_record
def update ( self ) :
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( self . steam_id ) } " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
data = r . json ( )
2017-10-10 21:30:06 +00:00
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( self . steam_id ) } /wl " )
2017-10-08 18:26:51 +00:00
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
wl = r . json ( )
2018-01-19 08:52:01 +00:00
self . rank_tier = data [ " rank_tier " ]
2017-10-08 18:26:51 +00:00
self . wins = wl [ " win " ]
self . losses = wl [ " lose " ]
class LeagueOfLegendsRanks ( enum . Enum ) :
BRONZE = 0
SILVER = 1
GOLD = 2
PLATINUM = 3
DIAMOND = 4
MASTER = 5
CHALLENGER = 6
class RomanNumerals ( enum . Enum ) :
I = 1
II = 2
III = 3
IV = 4
V = 5
class LeagueOfLegends ( Base ) :
__tablename__ = " leagueoflegends "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-08 18:26:51 +00:00
summoner_id = Column ( BigInteger , primary_key = True )
2017-10-17 18:49:28 +00:00
summoner_name = Column ( String , nullable = False )
2017-10-08 18:26:51 +00:00
level = Column ( Integer , nullable = False )
solo_division = Column ( Enum ( LeagueOfLegendsRanks ) )
solo_rank = Column ( Enum ( RomanNumerals ) )
flex_division = Column ( Enum ( LeagueOfLegendsRanks ) )
flex_rank = Column ( Enum ( RomanNumerals ) )
twtr_division = Column ( Enum ( LeagueOfLegendsRanks ) )
twtr_rank = Column ( Enum ( RomanNumerals ) )
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , summoner_name = None , summoner_id = None ) :
2017-10-08 18:26:51 +00:00
if summoner_name :
lol = session . query ( LeagueOfLegends ) . filter ( LeagueOfLegends . summoner_name == summoner_name ) . first ( )
elif summoner_id :
lol = session . query ( LeagueOfLegends ) . get ( summoner_id )
else :
raise SyntaxError ( " Neither summoner_name or summoner_id are specified " )
if lol is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( lol ) )
2017-10-08 18:26:51 +00:00
# Get the summoner_id
if summoner_name :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/ { summoner_name } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
else :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/ { summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
data = r . json ( )
lol = LeagueOfLegends ( royal_id = royal_id ,
summoner_id = data [ " id " ] ,
summoner_name = data [ " name " ] ,
level = data [ " summonerLevel " ] )
lol . update ( )
return lol
def update ( self ) :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/ { self . summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
data = r . json ( )
r = requests . get ( f " https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/ { self . summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
rank = r . json ( )
solo_rank = None
flex_rank = None
twtr_rank = None
for league in rank :
if league [ " queueType " ] == " RANKED_SOLO_5x5 " :
solo_rank = league
elif league [ " queueType " ] == " RANKED_FLEX_SR " :
flex_rank = league
elif league [ " queueType " ] == " RANKED_FLEX_TT " :
twtr_rank = league
self . summoner_id = data [ " id " ]
self . summoner_name = data [ " name " ]
self . level = data [ " summonerLevel " ]
if solo_rank is not None :
self . solo_division = LeagueOfLegendsRanks [ solo_rank [ " tier " ] ]
self . solo_rank = RomanNumerals [ solo_rank [ " rank " ] ]
else :
self . solo_division = None
self . solo_rank = None
if flex_rank is not None :
self . flex_division = LeagueOfLegendsRanks [ flex_rank [ " tier " ] ]
self . flex_rank = RomanNumerals [ flex_rank [ " rank " ] ]
else :
self . flex_division = None
self . flex_rank = None
if twtr_rank is not None :
self . twtr_division = LeagueOfLegendsRanks [ twtr_rank [ " tier " ] ]
self . twtr_rank = RomanNumerals [ twtr_rank [ " rank " ] ]
else :
self . twtr_division = None
self . twtr_rank = None
2017-10-09 11:45:42 +00:00
class Osu ( Base ) :
__tablename__ = " osu "
2017-10-17 18:49:28 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-09 11:45:42 +00:00
osu_id = Column ( Integer , primary_key = True )
2017-10-17 18:49:28 +00:00
osu_name = Column ( String , nullable = False )
2017-10-09 11:45:42 +00:00
std_pp = Column ( Float )
taiko_pp = Column ( Float )
catch_pp = Column ( Float )
mania_pp = Column ( Float )
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , osu_name ) :
2017-10-17 18:49:28 +00:00
o = session . query ( Osu ) . filter ( Osu . osu_name == osu_name ) . first ( )
2017-10-09 11:45:42 +00:00
if o is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( o ) )
2017-10-09 11:45:42 +00:00
r0 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=0 " )
r1 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=1 " )
r2 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=2 " )
r3 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=3 " )
if r0 . status_code != 200 or r1 . status_code != 200 or r2 . status_code != 200 or r3 . status_code != 200 :
raise RequestError ( f " Osu! API returned an error ( { r0 . status_code } { r1 . status_code } { r2 . status_code } { r3 . status_code } ) " )
j0 = r0 . json ( ) [ 0 ]
j1 = r1 . json ( ) [ 0 ]
j2 = r2 . json ( ) [ 0 ]
j3 = r3 . json ( ) [ 0 ]
new_record = Osu ( royal_id = royal_id ,
osu_id = j0 [ " user_id " ] ,
osu_name = j0 [ " username " ] ,
std_pp = j0 [ " pp_raw " ] ,
taiko_pp = j1 [ " pp_raw " ] ,
catch_pp = j2 [ " pp_raw " ] ,
mania_pp = j3 [ " pp_raw " ] )
return new_record
def update ( self ) :
2017-10-10 21:30:06 +00:00
r0 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=0 " )
r1 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=1 " )
r2 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=2 " )
r3 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=3 " )
2017-10-09 11:45:42 +00:00
if r0 . status_code != 200 or r1 . status_code != 200 or r2 . status_code != 200 or r3 . status_code != 200 :
raise RequestError (
f " Osu! API returned an error ( { r0 . status_code } { r1 . status_code } { r2 . status_code } { r3 . status_code } ) " )
j0 = r0 . json ( ) [ 0 ]
j1 = r1 . json ( ) [ 0 ]
j2 = r2 . json ( ) [ 0 ]
j3 = r3 . json ( ) [ 0 ]
self . osu_name = j0 [ " username " ]
self . std_pp = j0 [ " pp_raw " ]
self . taiko_pp = j1 [ " pp_raw " ]
self . catch_pp = j2 [ " pp_raw " ]
self . mania_pp = j3 [ " pp_raw " ]
2017-10-17 18:49:28 +00:00
class Discord ( Base ) :
__tablename__ = " discord "
__table_args__ = tuple ( UniqueConstraint ( " name " , " discriminator " ) )
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-17 18:49:28 +00:00
discord_id = Column ( BigInteger , primary_key = True )
name = Column ( String , nullable = False )
discriminator = Column ( Integer , nullable = False )
avatar_hex = Column ( String )
def __str__ ( self ) :
2018-03-12 12:29:12 +00:00
return f " { self . name } # { self . discriminator } "
2017-10-17 18:49:28 +00:00
def __repr__ ( self ) :
2017-10-27 11:38:32 +00:00
return f " <Discord user { self . discord_id } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_username , discord_user : DiscordUser ) :
2017-10-27 11:38:32 +00:00
d = session . query ( Discord ) . filter ( Discord . discord_id == discord_user . id ) . first ( )
if d is not None :
raise AlreadyExistingError ( repr ( d ) )
r = session . query ( Royal ) . filter ( Royal . username == royal_username ) . first ( )
if r is None :
raise NotFoundError ( " No Royal exists with that username " )
d = session . query ( Discord ) . filter ( Discord . royal_id == r . id ) . first ( )
if d is not None :
raise AlreadyExistingError ( repr ( d ) )
d = Discord ( royal = r ,
discord_id = discord_user . id ,
name = discord_user . name ,
discriminator = discord_user . discriminator ,
avatar_hex = discord_user . avatar )
return d
2017-10-17 18:49:28 +00:00
def mention ( self ) :
return f " <@ { self . id } > "
def avatar_url ( self , size = 256 ) :
if self . avatar_hex is None :
return " https://discordapp.com/assets/6debd47ed13483642cf09e832ed0bc1b.png "
2018-03-12 12:29:12 +00:00
return f " https://cdn.discordapp.com/avatars/ { self . discord_id } / { self . avatar_hex } .png?size= { size } "
2017-10-17 18:49:28 +00:00
class Overwatch ( Base ) :
__tablename__ = " overwatch "
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2018-01-25 14:24:17 +00:00
royal = relationship ( " Royal " , lazy = " joined " )
2017-10-17 18:49:28 +00:00
battletag = Column ( String , primary_key = True )
discriminator = Column ( Integer , primary_key = True )
icon = Column ( String , nullable = False )
level = Column ( Integer , nullable = False )
rank = Column ( Integer )
def __str__ ( self , separator = " # " ) :
return f " { self . battletag } { separator } { self . discriminator } "
def __repr__ ( self ) :
return f " <Overwatch { self } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , battletag , discriminator = None ) :
2017-10-17 18:49:28 +00:00
if discriminator is None :
battletag , discriminator = battletag . split ( " # " , 1 )
o = session . query ( Overwatch ) . filter_by ( battletag = battletag , discriminator = discriminator ) . first ( )
if o is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( o ) )
2017-10-17 18:49:28 +00:00
r = requests . get ( f " https://owapi.net/api/v3/u/ { battletag } - { discriminator } /stats " , headers = {
" User-Agent " : " Royal-Bot/4.0 " ,
" From " : " ste.pigozzi@gmail.com "
} )
if r . status_code != 200 :
raise RequestError ( f " OWAPI.net returned { r . status_code } " )
try :
j = r . json ( ) [ " eu " ] [ " stats " ] [ " quickplay " ] [ " overall_stats " ]
except TypeError :
raise RequestError ( " Something went wrong when retrieving the stats. " )
o = Overwatch ( royal_id = royal_id ,
battletag = battletag ,
discriminator = discriminator ,
icon = re . search ( r " https://.+ \ .cloudfront \ .net/game/unlocks/(0x[0-9A-F]+) \ .png " , j [ " avatar " ] ) . group ( 1 ) ,
level = j [ " prestige " ] * 100 + j [ " level " ] ,
rank = j [ " comprank " ] )
return o
2017-10-25 09:09:06 +00:00
def icon_url ( self ) :
return f " https://d1u1mce87gyfbn.cloudfront.net/game/unlocks/ { self . icon } .png "
2017-10-17 18:49:28 +00:00
def update ( self ) :
r = requests . get ( f " https://owapi.net/api/v3/u/ { self . battletag } - { self . discriminator } /stats " , headers = {
" User-Agent " : " Royal-Bot/4.0 " ,
" From " : " ste.pigozzi@gmail.com "
} )
if r . status_code != 200 :
raise RequestError ( f " OWAPI.net returned { r . status_code } " )
try :
j = r . json ( ) [ " eu " ] [ " stats " ] [ " quickplay " ] [ " overall_stats " ]
except TypeError :
raise RequestError ( " Something went wrong when retrieving the stats. " )
self . icon = re . search ( r " https://.+ \ .cloudfront \ .net/game/unlocks/(0x[0-9A-F]+) \ .png " , j [ " avatar " ] ) . group ( 1 )
self . level = j [ " prestige " ] * 100 + j [ " level " ]
self . rank = j [ " comprank " ]
2017-10-19 16:28:28 +00:00
class Diario ( Base ) :
__tablename__ = " diario "
id = Column ( Integer , primary_key = True )
timestamp = Column ( DateTime , nullable = False )
saver_id = Column ( Integer , ForeignKey ( " telegram.telegram_id " ) )
2018-01-25 14:24:17 +00:00
saver = relationship ( " Telegram " , foreign_keys = saver_id , lazy = " joined " )
2017-10-19 16:28:28 +00:00
author_id = Column ( Integer , ForeignKey ( " telegram.telegram_id " ) )
2018-01-25 14:24:17 +00:00
author = relationship ( " Telegram " , foreign_keys = author_id , lazy = " joined " )
2017-10-19 16:28:28 +00:00
text = Column ( String )
def __repr__ ( self ) :
return f " <Diario { self . id } > "
def __str__ ( self ) :
return f " { self . id } - { self . timestamp } - { self . author } : { self . text } "
@staticmethod
def import_from_json ( file ) :
import json
2017-10-30 09:46:37 +00:00
session = Session ( )
2017-10-19 16:28:28 +00:00
file = open ( file , " r " )
j = json . load ( file )
2018-01-25 14:24:17 +00:00
author_ids = {
" @Steffo " : 25167391 ,
" @GoodBalu " : 19611986 ,
" @gattopandacorno " : 200821462 ,
" @Albertino04 " : 131057096 ,
" @Francesco_Cuoghi " : 48371848 ,
" @VenomousDoc " : 48371848 ,
" @MaxSensei " : 1258401 ,
" @Protoh " : 125711787 ,
" @McspKap " : 304117728 ,
" @FrankRekt " : 31436195 ,
" @EvilBalu " : 26842090 ,
" @Dailir " : 135816455 ,
" @Paltri " : 186843362 ,
" @Doom_darth_vader " : 165792255 ,
" @httpIma " : 292086686 ,
" @DavidoMessori " : 509208316 ,
" @DavidoNiichan " : 509208316 ,
" @Peraemela99 " : 63804599 ,
" @infopz " : 20403805 ,
" @Baithoven " : 121537369 ,
" @Tauei " : 102833717
}
for n , entry in enumerate ( j ) :
author = author_ids [ entry [ " sender " ] ] if " sender " in entry and entry [ " sender " ] in author_ids else None
2017-10-19 16:28:28 +00:00
d = Diario ( timestamp = datetime . datetime . fromtimestamp ( float ( entry [ " timestamp " ] ) ) ,
2018-01-25 14:24:17 +00:00
author_id = author ,
2017-10-19 16:28:28 +00:00
text = entry [ " text " ] )
2018-01-25 14:24:17 +00:00
print ( f " { n } - { d } " )
2017-10-19 16:28:28 +00:00
session . add ( d )
session . commit ( )
2017-10-30 09:46:37 +00:00
session . close ( )
2017-10-19 16:28:28 +00:00
2018-01-25 14:24:17 +00:00
class BaluRage ( Base ) :
__tablename__ = " balurage "
id = Column ( Integer , primary_key = True )
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) )
royal = relationship ( " Royal " , lazy = " joined " )
reason = Column ( String )
def __repr__ ( self ) :
return f " <BaluRage { self . id } > "
2018-03-11 19:03:21 +00:00
class PlayedMusic ( Base ) :
__tablename__ = " playedmusic "
id = Column ( Integer , primary_key = True )
2018-03-12 12:29:12 +00:00
enqueuer_id = Column ( BigInteger , ForeignKey ( " discord.discord_id " ) )
enqueuer = relationship ( " Discord " , lazy = " joined " )
2018-03-11 19:03:21 +00:00
filename = Column ( String )
def __repr__ ( self ) :
return f " <PlayedMusic { self . filename } > "
2018-03-14 10:53:26 +00:00
class VoteQuestion ( Base ) :
__tablename__ = " votequestion "
id = Column ( Integer , primary_key = True )
message_id = Column ( BigInteger )
question = Column ( String , nullable = False )
anonymous = Column ( Boolean , nullable = False )
open = Column ( Boolean , default = True )
def __repr__ ( self ) :
return f " <Vote { self . id } > "
def generate_text ( self , session : Session ) :
query = session . execute ( " SELECT * FROM telegram LEFT JOIN (SELECT voteanswer.question_id, voteanswer.user_id, voteanswer.choice FROM votequestion JOIN voteanswer ON votequestion.id = voteanswer.question_id WHERE votequestion.id = 8) answer ON telegram.telegram_id = answer.user_id ORDER BY answer.choice; " )
text = f " <b> { self . question } </b> \n \n "
none , yes , no , abstain = 0 , 0 , 0 , 0
for record in query :
if record [ " username " ] == " royalgamesbot " :
continue
elif record [ " question_id " ] is None :
text + = " ⚪️ "
none + = 1
elif record [ " choice " ] == " YES " :
text + = " 🔵 "
yes + = 1
elif record [ " choice " ] == " NO " :
text + = " 🔴 "
no + = 1
elif record [ " choice " ] == " ABSTAIN " :
text + = " ⚫️ "
abstain + = 1
if not self . anonymous :
text + = f " { str ( record [ ' username ' ] ) } \n "
if self . anonymous :
text + = " \n "
text + = f " \n " \
f " ⚪ { none } \n " \
f " 🔵 { yes } \n " \
f " 🔴 { no } \n " \
f " ⚫️ { abstain } "
return text
class VoteChoices ( enum . Enum ) :
ABSTAIN = 1
YES = 2
NO = 3
class VoteAnswer ( Base ) :
__tablename__ = " voteanswer "
question_id = Column ( Integer , ForeignKey ( " votequestion.id " ) )
question = relationship ( " VoteQuestion " )
user_id = Column ( BigInteger , ForeignKey ( " telegram.telegram_id " ) )
user = relationship ( " Telegram " )
choice = Column ( Enum ( VoteChoices ) , nullable = False )
__table_args__ = ( PrimaryKeyConstraint ( " question_id " , " user_id " ) , )
def __repr__ ( self ) :
return f " <VoteAnswer { self . question_id } { self . user } { self . choice } > "
2017-10-04 16:51:40 +00:00
# If run as script, create all the tables in the db
if __name__ == " __main__ " :
2018-03-14 10:53:26 +00:00
session = Session ( )
session . query ( VoteQuestion ) . first ( ) . generate_text ( session )
#print("Creating new tables...")
#Base.metadata.create_all(bind=engine)
#print("Done!")