1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
royalnet/royalpack/commands/brawlhalla.py

170 lines
8.2 KiB
Python
Raw Permalink Normal View History

2020-05-10 22:46:12 +00:00
from typing import *
2020-03-19 15:26:11 +00:00
import asyncio
import logging
import aiohttp
2020-07-11 00:14:38 +00:00
from royalnet.backpack import tables as rbt
2020-07-21 21:12:14 +00:00
import royalnet.commands as rc
import royalnet.utils as ru
2020-05-10 22:46:12 +00:00
from sqlalchemy import or_, and_
2020-07-11 00:14:38 +00:00
from .abstract.linker import LinkerCommand
2020-03-23 23:02:55 +00:00
from ..tables import Steam, Brawlhalla, BrawlhallaDuo
2020-07-11 00:14:38 +00:00
from ..types import BrawlhallaRank, BrawlhallaMetal, BrawlhallaTier, Updatable
2020-03-19 15:26:11 +00:00
log = logging.getLogger(__name__)
2020-07-11 00:14:38 +00:00
class BrawlhallaCommand(LinkerCommand):
2020-03-19 15:26:11 +00:00
name: str = "brawlhalla"
aliases = ["bh", "bruhalla", "bruhlalla"]
2020-07-11 00:14:38 +00:00
description: str = "Visualizza le tue statistiche di Brawlhalla."
2020-03-19 15:26:11 +00:00
syntax: str = ""
2020-07-11 00:21:58 +00:00
def token(self):
2020-07-11 00:31:11 +00:00
return self.config['brawlhalla']['token']
2020-07-11 00:21:58 +00:00
2020-07-11 00:14:38 +00:00
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Brawlhalla]:
return user.steam
2020-03-19 15:26:11 +00:00
2020-07-11 00:14:38 +00:00
async def get_updatables(self, session) -> List[Brawlhalla]:
2020-07-21 21:12:14 +00:00
return await ru.asyncify(session.query(self.alchemy.get(Steam)).all)
2020-03-19 15:26:11 +00:00
2020-07-21 21:12:14 +00:00
async def create(self,
session,
user: rbt.User,
args: rc.CommandArgs,
data: Optional[rc.CommandData] = None) -> Optional[Brawlhalla]:
2020-07-21 21:12:14 +00:00
raise rc.InvalidInputError("Brawlhalla accounts are automatically linked from Steam.")
2020-03-23 23:02:55 +00:00
2020-07-11 00:14:38 +00:00
async def update(self, session, obj, change: Callable[[str, Any], Awaitable[None]]):
2020-03-19 15:26:11 +00:00
BrawlhallaT = self.alchemy.get(Brawlhalla)
2020-03-23 23:02:55 +00:00
DuoT = self.alchemy.get(BrawlhallaDuo)
2020-07-11 00:14:38 +00:00
log.info(f"Updating: {obj}")
async with aiohttp.ClientSession() as hcs:
bh: Brawlhalla = obj.brawlhalla
2020-03-19 15:26:11 +00:00
if bh is None:
log.debug(f"Checking if player has an account...")
2020-07-11 00:21:58 +00:00
async with hcs.get(f"https://api.brawlhalla.com/search?steamid={obj.steamid.as_64}&api_key={self.token()}") as response:
2020-03-19 15:26:11 +00:00
if response.status != 200:
2020-07-21 21:12:14 +00:00
raise rc.ExternalError(f"Brawlhalla API /search returned {response.status}!")
2020-03-19 15:26:11 +00:00
j = await response.json()
if j == {} or j == []:
log.debug("No account found.")
return
bh = BrawlhallaT(
2020-07-11 00:14:38 +00:00
steam=obj,
2020-03-19 15:26:11 +00:00
brawlhalla_id=j["brawlhalla_id"],
name=j["name"]
)
2020-07-11 00:14:38 +00:00
session.add(bh)
session.flush()
2020-07-11 00:21:58 +00:00
async with hcs.get(f"https://api.brawlhalla.com/player/{bh.brawlhalla_id}/ranked?api_key={self.token()}") as response:
2020-03-19 15:26:11 +00:00
if response.status != 200:
2020-07-21 21:12:14 +00:00
raise rc.ExternalError(f"Brawlhalla API /ranked returned {response.status}!")
2020-03-19 15:26:11 +00:00
j = await response.json()
if j == {} or j == []:
log.debug("No ranked info found.")
else:
2020-07-11 00:14:38 +00:00
await self._change(session=session, obj=bh, attribute="rating_1v1", new=j["rating"])
2020-03-19 15:26:11 +00:00
metal_name, tier_name = j["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
2020-07-11 00:14:38 +00:00
await self._change(session=session, obj=bh, attribute="rank_1v1", new=rank)
2020-03-23 23:02:55 +00:00
for jduo in j.get("2v2", []):
2020-07-21 21:12:14 +00:00
bhduo: Optional[BrawlhallaDuo] = await ru.asyncify(
2020-07-11 00:14:38 +00:00
session.query(DuoT)
2020-03-23 23:02:55 +00:00
.filter(
or_(
and_(
DuoT.id_one == jduo["brawlhalla_id_one"],
DuoT.id_two == jduo["brawlhalla_id_two"]
),
and_(
DuoT.id_one == jduo["brawlhalla_id_two"],
DuoT.id_two == jduo["brawlhalla_id_one"]
)
)
)
.one_or_none
)
if bhduo is None:
if bh.brawlhalla_id == jduo["brawlhalla_id_one"]:
2020-07-21 21:12:14 +00:00
otherbh: Optional[Brawlhalla] = await ru.asyncify(
2020-07-11 00:14:38 +00:00
session.query(BrawlhallaT).get, jduo["brawlhalla_id_two"]
2020-03-23 23:02:55 +00:00
)
else:
2020-07-21 21:12:14 +00:00
otherbh: Optional[Brawlhalla] = await ru.asyncify(
2020-07-11 00:14:38 +00:00
session.query(BrawlhallaT).get, jduo["brawlhalla_id_one"]
2020-03-23 23:02:55 +00:00
)
if otherbh is None:
continue
bhduo = DuoT(
one=bh,
two=otherbh,
)
2020-07-11 00:14:38 +00:00
session.add(bhduo)
await self._change(session=session, obj=bhduo, attribute="rating_2v2", new=jduo["rating"])
2020-03-23 23:02:55 +00:00
metal_name, tier_name = jduo["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
2020-07-11 00:14:38 +00:00
await self._change(session=session, obj=bhduo, attribute="rank_2v2", new=rank)
async def on_increase(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla! Congratulazioni!")
elif attribute == "rank_2v2":
await self.notify(f"📈 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono saliti a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla! Congratulazioni!")
async def on_unchanged(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
pass
async def on_decrease(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla.")
elif attribute == "rank_2v2":
await self.notify(f"📉 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono scesi a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla.")
async def on_first(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: None, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"🌟 [b]{obj.steam.user}[/b] si è classificato a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla!")
elif attribute == "rank_2v2":
await self.notify(f"🌟 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] si sono classificati a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla!")
async def on_reset(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: None) -> None:
if attribute == "rank_1v1":
await self.notify(f"⬜️ [b]{obj.steam.user}[/b] non ha più un rank su Brawlhalla.")
elif attribute == "rank_2v2":
await self.notify(f"⬜️ [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] non hanno più un rank su Brawlhalla.")
def describe(self, obj: Steam) -> str:
bh = obj.brawlhalla
string = [f" [b]{bh.name}[/b]", ""]
if bh.rank_1v1:
string.append("👤 [b]1v1[/b]")
string.append(f"[b]{bh.rank_1v1}[/b] ({bh.rating_1v1} MMR)")
string.append("")
if len(bh.duos) != 0:
string.append(f"👥 [b]2v2[/b]")
2020-07-17 23:38:10 +00:00
for duo in sorted(bh.duos, key=lambda d: -d.rating_2v2):
2020-07-11 00:14:38 +00:00
other = duo.other(bh)
string.append(f"Con [b]{other.steam.user}[/b]: [b]{duo.rank_2v2}[/b] ({duo.rating_2v2} MMR)")
if len(bh.duos) != 0:
string.append("")
return "\n".join(string)