2017-10-09 11:45:42 +00:00
import time
2017-10-19 16:28:28 +00:00
import datetime
2017-10-04 16:51:40 +00:00
from sqlalchemy . ext . declarative import declarative_base
from sqlalchemy . orm import sessionmaker , relationship
2017-10-17 18:49:28 +00:00
from sqlalchemy import Column , BigInteger , Integer , String , Numeric , DateTime , ForeignKey , Float , Enum , create_engine , UniqueConstraint
2017-10-05 07:47:59 +00:00
import requests
2017-10-27 09:53:05 +00:00
from errors import RequestError , NotFoundError , AlreadyExistingError
2017-10-05 07:47:59 +00:00
import re
2017-10-08 18:26:51 +00:00
import enum
2017-10-27 11:38:32 +00:00
from discord import User as DiscordUser
2017-10-30 09:46:37 +00:00
from telegram import User as TelegramUser
2017-10-04 16:51:40 +00:00
# Init the config reader
import configparser
config = configparser . ConfigParser ( )
config . read ( " config.ini " )
# Init the sqlalchemy engine
engine = create_engine ( config [ " Database " ] [ " database_uri " ] )
Base = declarative_base ( bind = engine )
Session = sessionmaker ( bind = engine )
class Royal ( Base ) :
__tablename__ = " royals "
id = Column ( Integer , primary_key = True )
username = Column ( String , unique = True , nullable = False )
2017-10-17 18:49:28 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , username : str ) :
2017-10-17 18:49:28 +00:00
r = session . query ( Royal ) . filter_by ( username = username ) . first ( )
if r is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( r ) )
2017-10-17 18:49:28 +00:00
return Royal ( username = username )
2017-10-04 16:51:40 +00:00
def __repr__ ( self ) :
return f " <Royal { self . username } > "
class Telegram ( Base ) :
__tablename__ = " telegram "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2017-10-04 16:51:40 +00:00
royal = relationship ( " Royal " )
telegram_id = Column ( BigInteger , primary_key = True )
first_name = Column ( String , nullable = False )
last_name = Column ( String )
username = Column ( String )
2017-10-30 09:46:37 +00:00
@staticmethod
def create ( session : Session , royal_username , telegram_user : TelegramUser ) :
t = session . query ( Telegram ) . filter_by ( telegram_id = telegram_user . id ) . first ( )
if t is not None :
raise AlreadyExistingError ( repr ( t ) )
r = session . query ( Royal ) . filter ( Royal . username == royal_username ) . first ( )
if r is None :
raise NotFoundError ( " No Royal exists with that username " )
t = session . query ( Telegram ) . filter ( Telegram . royal_id == r . id ) . first ( )
if t is not None :
raise AlreadyExistingError ( repr ( t ) )
return Telegram ( royal = r ,
telegram_id = telegram_user . id ,
first_name = telegram_user . first_name ,
last_name = telegram_user . last_name ,
username = telegram_user . username )
2017-10-04 16:51:40 +00:00
def __repr__ ( self ) :
return f " <Telegram { self . id } > "
def __str__ ( self ) :
if self . username is not None :
return f " @ { self . username } "
elif self . last_name is not None :
return f " { self . first_name } { self . last_name } "
else :
return self . first_name
2017-10-04 21:29:02 +00:00
class Steam ( Base ) :
__tablename__ = " steam "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2017-10-04 21:29:02 +00:00
royal = relationship ( " Royal " )
steam_id = Column ( String , primary_key = True )
2017-10-17 18:49:28 +00:00
persona_name = Column ( String , nullable = False )
avatar_hex = Column ( String , nullable = False )
2017-10-07 22:43:41 +00:00
trade_token = Column ( String )
2017-10-04 21:29:02 +00:00
def __repr__ ( self ) :
return f " <Steam { self . steam_id } > "
def __str__ ( self ) :
2017-10-05 18:21:47 +00:00
if self . persona_name is not None :
return self . persona_name
2017-10-04 21:29:02 +00:00
else :
return self . steam_id
2017-10-25 09:09:06 +00:00
def avatar_url ( self ) :
return f " https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/ { self . avatar_hex [ 0 : 2 ] } / { self . avatar_hex } .jpg "
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id : int , steam_id : str ) :
2017-10-08 18:26:51 +00:00
s = session . query ( Steam ) . get ( steam_id )
if s is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( s ) )
2017-10-08 18:26:51 +00:00
r = requests . get ( f " https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key= { config [ ' Steam ' ] [ ' api_key ' ] } &steamids= { steam_id } " )
if r . status_code != 200 :
raise RequestError ( f " Steam returned { r . status_code } " )
j = r . json ( )
if len ( j ) == 0 :
raise NotFoundError ( f " The steam_id doesn ' t match any steam account " )
s = Steam ( royal_id = royal_id ,
steam_id = steam_id ,
persona_name = j [ " response " ] [ " players " ] [ 0 ] [ " personaname " ] ,
2017-10-17 18:49:28 +00:00
avatar_hex = re . search ( r " https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/../(.+).jpg " , j [ " response " ] [ " players " ] [ 0 ] [ " avatar " ] ) . group ( 1 ) )
2017-10-08 18:26:51 +00:00
return s
2017-10-07 23:29:01 +00:00
2017-10-08 18:26:51 +00:00
@staticmethod
def find_trade_token ( trade_url ) :
2017-10-17 18:49:28 +00:00
return re . search ( r " https://steamcommunity \ .com/tradeoffer/new/ \ ?partner=[0-9]+&token=(. {8} ) " , trade_url ) . group ( 1 )
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_2 ( steam_id ) :
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
2017-10-08 18:26:51 +00:00
z = ( int ( steam_id ) - 76561197960265728 ) / / 2
y = int ( steam_id ) % 2
2017-10-07 23:29:01 +00:00
return f " STEAM_0: { y } : { z } "
2017-10-08 18:26:51 +00:00
@staticmethod
def to_steam_id_3 ( steam_id , full = False ) :
2017-10-07 23:29:01 +00:00
# Got this code from a random github gist. It could be completely wrong.
if full :
2017-10-08 18:26:51 +00:00
return f " [U:1: { int ( steam_id ) - 76561197960265728 } ] "
2017-10-07 23:29:01 +00:00
else :
2017-10-08 18:26:51 +00:00
return f " { int ( steam_id ) - 76561197960265728 } "
2017-10-07 23:29:01 +00:00
2017-10-05 07:47:59 +00:00
def update ( self ) :
r = requests . get ( f " https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key= { config [ ' Steam ' ] [ ' api_key ' ] } &steamids= { self . steam_id } " )
if r . status_code != 200 :
raise RequestError ( f " Steam returned { r . status_code } " )
j = r . json ( )
self . persona_name = j [ " response " ] [ " players " ] [ 0 ] [ " personaname " ]
2017-10-17 18:49:28 +00:00
self . avatar_hex = re . search ( r " https://steamcdn-a \ .akamaihd \ .net/steamcommunity/public/images/avatars/../(.+).jpg " , j [ " response " ] [ " players " ] [ 0 ] [ " avatar " ] ) . group ( 1 )
2017-10-04 21:29:02 +00:00
2017-10-05 18:21:47 +00:00
class RocketLeague ( Base ) :
__tablename__ = " rocketleague "
steam_id = Column ( String , ForeignKey ( " steam.steam_id " ) , primary_key = True )
steam = relationship ( " Steam " )
season = Column ( Integer )
single_rank = Column ( Integer )
single_div = Column ( Integer )
single_mmr = Column ( Integer )
doubles_rank = Column ( Integer )
doubles_div = Column ( Integer )
doubles_mmr = Column ( Integer )
standard_rank = Column ( Integer )
standard_div = Column ( Integer )
standard_mmr = Column ( Integer )
solo_std_rank = Column ( Integer )
solo_std_div = Column ( Integer )
solo_std_mmr = Column ( Integer )
wins = Column ( Integer )
def __repr__ ( self ) :
return f " <RocketLeague { self . steam_id } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , steam_id : str ) :
2017-10-08 18:26:51 +00:00
rl = session . query ( RocketLeague ) . get ( steam_id )
2017-10-05 18:21:47 +00:00
if rl is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( rl ) )
2017-10-07 22:43:41 +00:00
r = requests . get ( f " https://api.rocketleaguestats.com/v1/player?apikey= { config [ ' Rocket League ' ] [ ' rlstats_api_key ' ] } &unique_id= { str ( steam_id ) } &platform_id=1 " )
if r . status_code == 404 :
2017-10-08 18:26:51 +00:00
raise NotFoundError ( " The specified user has never played Rocket League " )
elif r . status_code != 200 :
2017-10-07 22:43:41 +00:00
raise RequestError ( " Rocket League Stats returned {r.status_code} " )
2017-10-05 18:21:47 +00:00
new_record = RocketLeague ( steam_id = steam_id )
2017-10-08 18:26:51 +00:00
new_record . update ( data = r . json ( ) )
2017-10-05 18:21:47 +00:00
return new_record
2017-10-08 18:26:51 +00:00
def update ( self , data = None ) :
if data is None :
r = requests . get ( f " https://api.rocketleaguestats.com/v1/player?apikey= { config [ ' Rocket League ' ] [ ' rlstats_api_key ' ] } &unique_id= { self . steam_id } &platform_id=1 " )
if r . status_code != 200 :
raise RequestError ( f " Rocket League Stats returned { r . status_code } " )
data = r . json ( )
2017-10-05 18:21:47 +00:00
# Get current season
current_season = 0
2017-10-08 18:26:51 +00:00
for season in data [ " rankedSeasons " ] :
2017-10-05 18:21:47 +00:00
if int ( season ) > current_season :
current_season = int ( season )
if current_season == 0 :
return
2017-10-09 11:45:42 +00:00
self . season = current_season
2017-10-05 18:21:47 +00:00
current_season = str ( current_season )
2017-10-09 11:45:42 +00:00
# Get wins
self . wins = data [ " stats " ] [ " wins " ]
2017-10-05 18:21:47 +00:00
# Get ranked data
# Single 1v1
2017-10-08 18:26:51 +00:00
if " 10 " in data [ " rankedSeasons " ] [ current_season ] :
self . single_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " matchesPlayed " ] > = 10 :
self . single_rank = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " tier " ]
self . single_div = data [ " rankedSeasons " ] [ current_season ] [ " 10 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . single_rank = None
self . single_div = None
2017-10-05 18:21:47 +00:00
# Doubles 2v2
2017-10-08 18:26:51 +00:00
if " 11 " in data [ " rankedSeasons " ] [ current_season ] :
self . doubles_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " matchesPlayed " ] > = 10 :
self . doubles_rank = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " tier " ]
self . doubles_div = data [ " rankedSeasons " ] [ current_season ] [ " 11 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . doubles_rank = None
self . doubles_div = None
2017-10-05 18:21:47 +00:00
# Standard 3v3
2017-10-08 18:26:51 +00:00
if " 13 " in data [ " rankedSeasons " ] [ current_season ] :
self . standard_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " matchesPlayed " ] > = 10 :
self . standard_rank = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " tier " ]
self . standard_div = data [ " rankedSeasons " ] [ current_season ] [ " 13 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . standard_rank = None
self . standard_div = None
2017-10-05 18:21:47 +00:00
# Solo Standard 3v3
2017-10-08 18:26:51 +00:00
if " 12 " in data [ " rankedSeasons " ] [ current_season ] :
self . solo_std_mmr = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " rankPoints " ]
if data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " matchesPlayed " ] > = 10 :
self . solo_std_rank = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " tier " ]
self . solo_std_div = data [ " rankedSeasons " ] [ current_season ] [ " 12 " ] [ " division " ]
2017-10-05 19:13:42 +00:00
else :
self . solo_std_rank = None
self . solo_std_div = None
2017-10-05 18:21:47 +00:00
2017-10-08 18:26:51 +00:00
class Dota ( Base ) :
__tablename__ = " dota "
steam_id = Column ( String , ForeignKey ( " steam.steam_id " ) , primary_key = True )
steam = relationship ( " Steam " )
solo_mmr = Column ( Integer )
party_mmr = Column ( Integer )
2017-10-17 18:49:28 +00:00
wins = Column ( Integer , nullable = False )
losses = Column ( Integer , nullable = False )
2017-10-08 18:26:51 +00:00
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , steam_id : int ) :
2017-10-08 18:26:51 +00:00
d = session . query ( Dota ) . get ( steam_id )
if d is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( d ) )
2017-10-08 18:26:51 +00:00
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( steam_id ) } " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
data = r . json ( )
if " profile " not in data :
raise NotFoundError ( " The specified user has never played Dota or has a private match history " )
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( steam_id ) } /wl " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
wl = r . json ( )
new_record = Dota ( steam_id = steam_id ,
solo_mmr = data [ " solo_competitive_rank " ] ,
party_mmr = data [ " competitive_rank " ] ,
wins = wl [ " win " ] ,
losses = wl [ " lose " ] )
return new_record
def update ( self ) :
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( self . steam_id ) } " )
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
data = r . json ( )
2017-10-10 21:30:06 +00:00
r = requests . get ( f " https://api.opendota.com/api/players/ { Steam . to_steam_id_3 ( self . steam_id ) } /wl " )
2017-10-08 18:26:51 +00:00
if r . status_code != 200 :
raise RequestError ( " OpenDota returned {r.status_code} " )
wl = r . json ( )
self . solo_mmr = data [ " solo_competitive_rank " ]
self . party_mmr = data [ " competitive_rank " ]
self . wins = wl [ " win " ]
self . losses = wl [ " lose " ]
class LeagueOfLegendsRanks ( enum . Enum ) :
BRONZE = 0
SILVER = 1
GOLD = 2
PLATINUM = 3
DIAMOND = 4
MASTER = 5
CHALLENGER = 6
class RomanNumerals ( enum . Enum ) :
I = 1
II = 2
III = 3
IV = 4
V = 5
class LeagueOfLegends ( Base ) :
__tablename__ = " leagueoflegends "
2017-10-09 11:45:42 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2017-10-08 18:26:51 +00:00
royal = relationship ( " Royal " )
summoner_id = Column ( BigInteger , primary_key = True )
2017-10-17 18:49:28 +00:00
summoner_name = Column ( String , nullable = False )
2017-10-08 18:26:51 +00:00
level = Column ( Integer , nullable = False )
solo_division = Column ( Enum ( LeagueOfLegendsRanks ) )
solo_rank = Column ( Enum ( RomanNumerals ) )
flex_division = Column ( Enum ( LeagueOfLegendsRanks ) )
flex_rank = Column ( Enum ( RomanNumerals ) )
twtr_division = Column ( Enum ( LeagueOfLegendsRanks ) )
twtr_rank = Column ( Enum ( RomanNumerals ) )
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , summoner_name = None , summoner_id = None ) :
2017-10-08 18:26:51 +00:00
if summoner_name :
lol = session . query ( LeagueOfLegends ) . filter ( LeagueOfLegends . summoner_name == summoner_name ) . first ( )
elif summoner_id :
lol = session . query ( LeagueOfLegends ) . get ( summoner_id )
else :
raise SyntaxError ( " Neither summoner_name or summoner_id are specified " )
if lol is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( lol ) )
2017-10-08 18:26:51 +00:00
# Get the summoner_id
if summoner_name :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/by-name/ { summoner_name } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
else :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/ { summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
data = r . json ( )
lol = LeagueOfLegends ( royal_id = royal_id ,
summoner_id = data [ " id " ] ,
summoner_name = data [ " name " ] ,
level = data [ " summonerLevel " ] )
lol . update ( )
return lol
def update ( self ) :
r = requests . get ( f " https://euw1.api.riotgames.com/lol/summoner/v3/summoners/ { self . summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
data = r . json ( )
r = requests . get ( f " https://euw1.api.riotgames.com/lol/league/v3/positions/by-summoner/ { self . summoner_id } ?api_key= { config [ ' League of Legends ' ] [ ' riot_api_key ' ] } " )
if r . status_code != 200 :
return RequestError ( f " League of Legends API returned { r . status_code } " )
rank = r . json ( )
solo_rank = None
flex_rank = None
twtr_rank = None
for league in rank :
if league [ " queueType " ] == " RANKED_SOLO_5x5 " :
solo_rank = league
elif league [ " queueType " ] == " RANKED_FLEX_SR " :
flex_rank = league
elif league [ " queueType " ] == " RANKED_FLEX_TT " :
twtr_rank = league
self . summoner_id = data [ " id " ]
self . summoner_name = data [ " name " ]
self . level = data [ " summonerLevel " ]
if solo_rank is not None :
self . solo_division = LeagueOfLegendsRanks [ solo_rank [ " tier " ] ]
self . solo_rank = RomanNumerals [ solo_rank [ " rank " ] ]
else :
self . solo_division = None
self . solo_rank = None
if flex_rank is not None :
self . flex_division = LeagueOfLegendsRanks [ flex_rank [ " tier " ] ]
self . flex_rank = RomanNumerals [ flex_rank [ " rank " ] ]
else :
self . flex_division = None
self . flex_rank = None
if twtr_rank is not None :
self . twtr_division = LeagueOfLegendsRanks [ twtr_rank [ " tier " ] ]
self . twtr_rank = RomanNumerals [ twtr_rank [ " rank " ] ]
else :
self . twtr_division = None
self . twtr_rank = None
2017-10-09 11:45:42 +00:00
class Osu ( Base ) :
__tablename__ = " osu "
2017-10-17 18:49:28 +00:00
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
2017-10-09 11:45:42 +00:00
royal = relationship ( " Royal " )
osu_id = Column ( Integer , primary_key = True )
2017-10-17 18:49:28 +00:00
osu_name = Column ( String , nullable = False )
2017-10-09 11:45:42 +00:00
std_pp = Column ( Float )
taiko_pp = Column ( Float )
catch_pp = Column ( Float )
mania_pp = Column ( Float )
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , osu_name ) :
2017-10-17 18:49:28 +00:00
o = session . query ( Osu ) . filter ( Osu . osu_name == osu_name ) . first ( )
2017-10-09 11:45:42 +00:00
if o is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( o ) )
2017-10-09 11:45:42 +00:00
r0 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=0 " )
r1 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=1 " )
r2 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=2 " )
r3 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { osu_name } &m=3 " )
if r0 . status_code != 200 or r1 . status_code != 200 or r2 . status_code != 200 or r3 . status_code != 200 :
raise RequestError ( f " Osu! API returned an error ( { r0 . status_code } { r1 . status_code } { r2 . status_code } { r3 . status_code } ) " )
j0 = r0 . json ( ) [ 0 ]
j1 = r1 . json ( ) [ 0 ]
j2 = r2 . json ( ) [ 0 ]
j3 = r3 . json ( ) [ 0 ]
new_record = Osu ( royal_id = royal_id ,
osu_id = j0 [ " user_id " ] ,
osu_name = j0 [ " username " ] ,
std_pp = j0 [ " pp_raw " ] ,
taiko_pp = j1 [ " pp_raw " ] ,
catch_pp = j2 [ " pp_raw " ] ,
mania_pp = j3 [ " pp_raw " ] )
return new_record
def update ( self ) :
2017-10-10 21:30:06 +00:00
r0 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=0 " )
r1 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=1 " )
r2 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=2 " )
r3 = requests . get ( f " https://osu.ppy.sh/api/get_user?k= { config [ ' Osu! ' ] [ ' ppy_api_key ' ] } &u= { self . osu_name } &m=3 " )
2017-10-09 11:45:42 +00:00
if r0 . status_code != 200 or r1 . status_code != 200 or r2 . status_code != 200 or r3 . status_code != 200 :
raise RequestError (
f " Osu! API returned an error ( { r0 . status_code } { r1 . status_code } { r2 . status_code } { r3 . status_code } ) " )
j0 = r0 . json ( ) [ 0 ]
j1 = r1 . json ( ) [ 0 ]
j2 = r2 . json ( ) [ 0 ]
j3 = r3 . json ( ) [ 0 ]
self . osu_name = j0 [ " username " ]
self . std_pp = j0 [ " pp_raw " ]
self . taiko_pp = j1 [ " pp_raw " ]
self . catch_pp = j2 [ " pp_raw " ]
self . mania_pp = j3 [ " pp_raw " ]
2017-10-17 18:49:28 +00:00
class Discord ( Base ) :
__tablename__ = " discord "
__table_args__ = tuple ( UniqueConstraint ( " name " , " discriminator " ) )
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
royal = relationship ( " Royal " )
discord_id = Column ( BigInteger , primary_key = True )
name = Column ( String , nullable = False )
discriminator = Column ( Integer , nullable = False )
avatar_hex = Column ( String )
def __str__ ( self ) :
return f " { self . username } # { self . discriminator } "
def __repr__ ( self ) :
2017-10-27 11:38:32 +00:00
return f " <Discord user { self . discord_id } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_username , discord_user : DiscordUser ) :
2017-10-27 11:38:32 +00:00
d = session . query ( Discord ) . filter ( Discord . discord_id == discord_user . id ) . first ( )
if d is not None :
raise AlreadyExistingError ( repr ( d ) )
r = session . query ( Royal ) . filter ( Royal . username == royal_username ) . first ( )
if r is None :
raise NotFoundError ( " No Royal exists with that username " )
d = session . query ( Discord ) . filter ( Discord . royal_id == r . id ) . first ( )
if d is not None :
raise AlreadyExistingError ( repr ( d ) )
d = Discord ( royal = r ,
discord_id = discord_user . id ,
name = discord_user . name ,
discriminator = discord_user . discriminator ,
avatar_hex = discord_user . avatar )
return d
2017-10-17 18:49:28 +00:00
def mention ( self ) :
return f " <@ { self . id } > "
def avatar_url ( self , size = 256 ) :
if self . avatar_hex is None :
return " https://discordapp.com/assets/6debd47ed13483642cf09e832ed0bc1b.png "
return f " https://cdn.discordapp.com/avatars/ { self . id } / { self . avatar } .png?size= { size } "
class Overwatch ( Base ) :
__tablename__ = " overwatch "
royal_id = Column ( Integer , ForeignKey ( " royals.id " ) , nullable = False )
royal = relationship ( " Royal " )
battletag = Column ( String , primary_key = True )
discriminator = Column ( Integer , primary_key = True )
icon = Column ( String , nullable = False )
level = Column ( Integer , nullable = False )
rank = Column ( Integer )
def __str__ ( self , separator = " # " ) :
return f " { self . battletag } { separator } { self . discriminator } "
def __repr__ ( self ) :
return f " <Overwatch { self } > "
@staticmethod
2017-10-30 09:46:37 +00:00
def create ( session : Session , royal_id , battletag , discriminator = None ) :
2017-10-17 18:49:28 +00:00
if discriminator is None :
battletag , discriminator = battletag . split ( " # " , 1 )
o = session . query ( Overwatch ) . filter_by ( battletag = battletag , discriminator = discriminator ) . first ( )
if o is not None :
2017-10-27 09:53:05 +00:00
raise AlreadyExistingError ( repr ( o ) )
2017-10-17 18:49:28 +00:00
r = requests . get ( f " https://owapi.net/api/v3/u/ { battletag } - { discriminator } /stats " , headers = {
" User-Agent " : " Royal-Bot/4.0 " ,
" From " : " ste.pigozzi@gmail.com "
} )
if r . status_code != 200 :
raise RequestError ( f " OWAPI.net returned { r . status_code } " )
try :
j = r . json ( ) [ " eu " ] [ " stats " ] [ " quickplay " ] [ " overall_stats " ]
except TypeError :
raise RequestError ( " Something went wrong when retrieving the stats. " )
o = Overwatch ( royal_id = royal_id ,
battletag = battletag ,
discriminator = discriminator ,
icon = re . search ( r " https://.+ \ .cloudfront \ .net/game/unlocks/(0x[0-9A-F]+) \ .png " , j [ " avatar " ] ) . group ( 1 ) ,
level = j [ " prestige " ] * 100 + j [ " level " ] ,
rank = j [ " comprank " ] )
return o
2017-10-25 09:09:06 +00:00
def icon_url ( self ) :
return f " https://d1u1mce87gyfbn.cloudfront.net/game/unlocks/ { self . icon } .png "
2017-10-17 18:49:28 +00:00
def update ( self ) :
r = requests . get ( f " https://owapi.net/api/v3/u/ { self . battletag } - { self . discriminator } /stats " , headers = {
" User-Agent " : " Royal-Bot/4.0 " ,
" From " : " ste.pigozzi@gmail.com "
} )
if r . status_code != 200 :
raise RequestError ( f " OWAPI.net returned { r . status_code } " )
try :
j = r . json ( ) [ " eu " ] [ " stats " ] [ " quickplay " ] [ " overall_stats " ]
except TypeError :
raise RequestError ( " Something went wrong when retrieving the stats. " )
self . icon = re . search ( r " https://.+ \ .cloudfront \ .net/game/unlocks/(0x[0-9A-F]+) \ .png " , j [ " avatar " ] ) . group ( 1 )
self . level = j [ " prestige " ] * 100 + j [ " level " ]
self . rank = j [ " comprank " ]
2017-10-19 16:28:28 +00:00
class Diario ( Base ) :
__tablename__ = " diario "
id = Column ( Integer , primary_key = True )
timestamp = Column ( DateTime , nullable = False )
saver_id = Column ( Integer , ForeignKey ( " telegram.telegram_id " ) )
saver = relationship ( " Telegram " , foreign_keys = saver_id )
author_id = Column ( Integer , ForeignKey ( " telegram.telegram_id " ) )
author = relationship ( " Telegram " , foreign_keys = author_id )
text = Column ( String )
def __repr__ ( self ) :
return f " <Diario { self . id } > "
def __str__ ( self ) :
return f " { self . id } - { self . timestamp } - { self . author } : { self . text } "
@staticmethod
def import_from_json ( file ) :
import json
2017-10-30 09:46:37 +00:00
session = Session ( )
2017-10-19 16:28:28 +00:00
file = open ( file , " r " )
j = json . load ( file )
for entry in j :
if " sender " in entry :
author = session . query ( Telegram ) . filter_by ( username = entry [ " sender " ] . lstrip ( " @ " ) ) . first ( )
else :
author = None
d = Diario ( timestamp = datetime . datetime . fromtimestamp ( float ( entry [ " timestamp " ] ) ) ,
author = author ,
text = entry [ " text " ] )
print ( d )
session . add ( d )
session . commit ( )
2017-10-30 09:46:37 +00:00
session . close ( )
2017-10-19 16:28:28 +00:00
2017-11-01 18:23:11 +00:00
class CVMusic ( Base ) :
__tablename__ = " cvmusic "
id = Column ( BigInteger , primary_key = True )
2017-11-07 17:44:00 +00:00
url = Column ( String , nullable = False )
enqueued = Column ( DateTime , nullable = False )
started = Column ( DateTime , nullable = False )
2017-11-01 18:23:11 +00:00
2017-11-06 15:54:36 +00:00
user_id = Column ( Integer , ForeignKey ( " royals.id " ) )
user = relationship ( " Royal " )
2017-11-01 18:23:11 +00:00
@staticmethod
2017-11-07 17:44:00 +00:00
def create_and_add ( url : str , user : Royal , enqueued : datetime . datetime , started : datetime . datetime ) :
2017-11-01 18:23:11 +00:00
session = Session ( )
2017-11-07 17:44:00 +00:00
session . add ( CVMusic ( url = url ,
enqueued = enqueued ,
started = started ,
2017-11-06 15:54:36 +00:00
user_id = user . id ) )
2017-11-01 18:23:11 +00:00
session . commit ( )
session . close ( )
2017-10-04 16:51:40 +00:00
# If run as script, create all the tables in the db
if __name__ == " __main__ " :
2017-10-30 09:46:37 +00:00
Base . metadata . create_all ( bind = engine )