2020-05-10 22:46:12 +00:00
from typing import *
2020-03-19 15:26:11 +00:00
import asyncio
import logging
import aiohttp
2020-07-11 00:14:38 +00:00
from royalnet . backpack import tables as rbt
2020-07-21 21:12:14 +00:00
import royalnet . commands as rc
import royalnet . utils as ru
2020-05-10 22:46:12 +00:00
from sqlalchemy import or_ , and_
2020-07-11 00:14:38 +00:00
from . abstract . linker import LinkerCommand
2020-03-23 23:02:55 +00:00
from . . tables import Steam , Brawlhalla , BrawlhallaDuo
2020-07-11 00:14:38 +00:00
from . . types import BrawlhallaRank , BrawlhallaMetal , BrawlhallaTier , Updatable
2020-03-19 15:26:11 +00:00
log = logging . getLogger ( __name__ )
2020-07-11 00:14:38 +00:00
class BrawlhallaCommand ( LinkerCommand ) :
2020-03-19 15:26:11 +00:00
name : str = " brawlhalla "
aliases = [ " bh " , " bruhalla " , " bruhlalla " ]
2020-07-11 00:14:38 +00:00
description : str = " Visualizza le tue statistiche di Brawlhalla. "
2020-03-19 15:26:11 +00:00
syntax : str = " "
2020-07-11 00:21:58 +00:00
def token ( self ) :
2020-07-11 00:31:11 +00:00
return self . config [ ' brawlhalla ' ] [ ' token ' ]
2020-07-11 00:21:58 +00:00
2020-07-11 00:14:38 +00:00
async def get_updatables_of_user ( self , session , user : rbt . User ) - > List [ Brawlhalla ] :
return user . steam
2020-03-19 15:26:11 +00:00
2020-07-11 00:14:38 +00:00
async def get_updatables ( self , session ) - > List [ Brawlhalla ] :
2020-07-21 21:12:14 +00:00
return await ru . asyncify ( session . query ( self . alchemy . get ( Steam ) ) . all )
2020-03-19 15:26:11 +00:00
2020-07-21 21:12:14 +00:00
async def create ( self ,
session ,
user : rbt . User ,
args : rc . CommandArgs ,
2020-07-21 21:15:22 +00:00
data : Optional [ rc . CommandData ] = None ) - > Optional [ Brawlhalla ] :
2020-07-21 21:12:14 +00:00
raise rc . InvalidInputError ( " Brawlhalla accounts are automatically linked from Steam. " )
2020-03-23 23:02:55 +00:00
2020-07-11 00:14:38 +00:00
async def update ( self , session , obj , change : Callable [ [ str , Any ] , Awaitable [ None ] ] ) :
2020-03-19 15:26:11 +00:00
BrawlhallaT = self . alchemy . get ( Brawlhalla )
2020-03-23 23:02:55 +00:00
DuoT = self . alchemy . get ( BrawlhallaDuo )
2020-07-11 00:14:38 +00:00
log . info ( f " Updating: { obj } " )
async with aiohttp . ClientSession ( ) as hcs :
bh : Brawlhalla = obj . brawlhalla
2020-03-19 15:26:11 +00:00
if bh is None :
log . debug ( f " Checking if player has an account... " )
2020-07-11 00:21:58 +00:00
async with hcs . get ( f " https://api.brawlhalla.com/search?steamid= { obj . steamid . as_64 } &api_key= { self . token ( ) } " ) as response :
2020-03-19 15:26:11 +00:00
if response . status != 200 :
2020-07-21 21:12:14 +00:00
raise rc . ExternalError ( f " Brawlhalla API /search returned { response . status } ! " )
2020-03-19 15:26:11 +00:00
j = await response . json ( )
if j == { } or j == [ ] :
log . debug ( " No account found. " )
return
bh = BrawlhallaT (
2020-07-11 00:14:38 +00:00
steam = obj ,
2020-03-19 15:26:11 +00:00
brawlhalla_id = j [ " brawlhalla_id " ] ,
name = j [ " name " ]
)
2020-07-11 00:14:38 +00:00
session . add ( bh )
session . flush ( )
2020-07-11 00:21:58 +00:00
async with hcs . get ( f " https://api.brawlhalla.com/player/ { bh . brawlhalla_id } /ranked?api_key= { self . token ( ) } " ) as response :
2020-03-19 15:26:11 +00:00
if response . status != 200 :
2020-07-21 21:12:14 +00:00
raise rc . ExternalError ( f " Brawlhalla API /ranked returned { response . status } ! " )
2020-03-19 15:26:11 +00:00
j = await response . json ( )
if j == { } or j == [ ] :
log . debug ( " No ranked info found. " )
else :
2020-07-11 00:14:38 +00:00
await self . _change ( session = session , obj = bh , attribute = " rating_1v1 " , new = j [ " rating " ] )
2020-03-19 15:26:11 +00:00
metal_name , tier_name = j [ " tier " ] . split ( " " , 1 )
metal = BrawlhallaMetal [ metal_name . upper ( ) ]
tier = BrawlhallaTier ( int ( tier_name ) )
rank = BrawlhallaRank ( metal = metal , tier = tier )
2020-07-11 00:14:38 +00:00
await self . _change ( session = session , obj = bh , attribute = " rank_1v1 " , new = rank )
2020-03-23 23:02:55 +00:00
for jduo in j . get ( " 2v2 " , [ ] ) :
2020-07-21 21:12:14 +00:00
bhduo : Optional [ BrawlhallaDuo ] = await ru . asyncify (
2020-07-11 00:14:38 +00:00
session . query ( DuoT )
2020-03-23 23:02:55 +00:00
. filter (
or_ (
and_ (
DuoT . id_one == jduo [ " brawlhalla_id_one " ] ,
DuoT . id_two == jduo [ " brawlhalla_id_two " ]
) ,
and_ (
DuoT . id_one == jduo [ " brawlhalla_id_two " ] ,
DuoT . id_two == jduo [ " brawlhalla_id_one " ]
)
)
)
. one_or_none
)
if bhduo is None :
if bh . brawlhalla_id == jduo [ " brawlhalla_id_one " ] :
2020-07-21 21:12:14 +00:00
otherbh : Optional [ Brawlhalla ] = await ru . asyncify (
2020-07-11 00:14:38 +00:00
session . query ( BrawlhallaT ) . get , jduo [ " brawlhalla_id_two " ]
2020-03-23 23:02:55 +00:00
)
else :
2020-07-21 21:12:14 +00:00
otherbh : Optional [ Brawlhalla ] = await ru . asyncify (
2020-07-11 00:14:38 +00:00
session . query ( BrawlhallaT ) . get , jduo [ " brawlhalla_id_one " ]
2020-03-23 23:02:55 +00:00
)
if otherbh is None :
continue
bhduo = DuoT (
one = bh ,
two = otherbh ,
)
2020-07-11 00:14:38 +00:00
session . add ( bhduo )
await self . _change ( session = session , obj = bhduo , attribute = " rating_2v2 " , new = jduo [ " rating " ] )
2020-03-23 23:02:55 +00:00
metal_name , tier_name = jduo [ " tier " ] . split ( " " , 1 )
metal = BrawlhallaMetal [ metal_name . upper ( ) ]
tier = BrawlhallaTier ( int ( tier_name ) )
rank = BrawlhallaRank ( metal = metal , tier = tier )
2020-07-11 00:14:38 +00:00
await self . _change ( session = session , obj = bhduo , attribute = " rank_2v2 " , new = rank )
async def on_increase ( self , session , obj : Union [ Brawlhalla , BrawlhallaDuo ] , attribute : str , old : Any , new : Any ) - > None :
if attribute == " rank_1v1 " :
await self . notify ( f " 📈 [b] { obj . steam . user } [/b] è salito a [b] { new } [/b] ( { obj . rating_1v1 } MMR) in 1v1 su Brawlhalla! Congratulazioni! " )
elif attribute == " rank_2v2 " :
await self . notify ( f " 📈 [b] { obj . one . steam . user } [/b] e [b] { obj . two . steam . user } [/b] sono saliti a [b] { new } [/b] ( { obj . rating_2v2 } MMR) in 2v2 su Brawlhalla! Congratulazioni! " )
async def on_unchanged ( self , session , obj : Union [ Brawlhalla , BrawlhallaDuo ] , attribute : str , old : Any , new : Any ) - > None :
pass
async def on_decrease ( self , session , obj : Union [ Brawlhalla , BrawlhallaDuo ] , attribute : str , old : Any , new : Any ) - > None :
if attribute == " rank_1v1 " :
await self . notify ( f " 📉 [b] { obj . steam . user } [/b] è sceso a [b] { new } [/b] ( { obj . rating_1v1 } MMR) in 1v1 su Brawlhalla. " )
elif attribute == " rank_2v2 " :
await self . notify ( f " 📉 [b] { obj . one . steam . user } [/b] e [b] { obj . two . steam . user } [/b] sono scesi a [b] { new } [/b] ( { obj . rating_2v2 } MMR) in 2v2 su Brawlhalla. " )
async def on_first ( self , session , obj : Union [ Brawlhalla , BrawlhallaDuo ] , attribute : str , old : None , new : Any ) - > None :
if attribute == " rank_1v1 " :
await self . notify ( f " 🌟 [b] { obj . steam . user } [/b] si è classificato a [b] { new } [/b] ( { obj . rating_1v1 } MMR) in 1v1 su Brawlhalla! " )
elif attribute == " rank_2v2 " :
await self . notify ( f " 🌟 [b] { obj . one . steam . user } [/b] e [b] { obj . two . steam . user } [/b] si sono classificati a [b] { new } [/b] ( { obj . rating_2v2 } MMR) in 2v2 su Brawlhalla! " )
async def on_reset ( self , session , obj : Union [ Brawlhalla , BrawlhallaDuo ] , attribute : str , old : Any , new : None ) - > None :
if attribute == " rank_1v1 " :
await self . notify ( f " ⬜️ [b] { obj . steam . user } [/b] non ha più un rank su Brawlhalla. " )
elif attribute == " rank_2v2 " :
await self . notify ( f " ⬜️ [b] { obj . one . steam . user } [/b] e [b] { obj . two . steam . user } [/b] non hanno più un rank su Brawlhalla. " )
def describe ( self , obj : Steam ) - > str :
bh = obj . brawlhalla
string = [ f " ℹ ️ [b]{ bh . name } [/b] " , " " ]
if bh . rank_1v1 :
string . append ( " 👤 [b]1v1[/b] " )
string . append ( f " [b] { bh . rank_1v1 } [/b] ( { bh . rating_1v1 } MMR) " )
string . append ( " " )
if len ( bh . duos ) != 0 :
string . append ( f " 👥 [b]2v2[/b] " )
2020-07-17 23:38:10 +00:00
for duo in sorted ( bh . duos , key = lambda d : - d . rating_2v2 ) :
2020-07-11 00:14:38 +00:00
other = duo . other ( bh )
string . append ( f " Con [b] { other . steam . user } [/b]: [b] { duo . rank_2v2 } [/b] ( { duo . rating_2v2 } MMR) " )
if len ( bh . duos ) != 0 :
string . append ( " " )
return " \n " . join ( string )